aut
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);
+ /**
+ * Configure OpenTelemetry for Pulsar Client
+ *
+ * When you pass an OpenTelemetry instance, Pulsar client will emit metrics that can be exported in a variety
+ * of different methods.
+ *
+ * Refer to OpenTelemetry Java SDK documentation for
+ * how to configure OpenTelemetry and the metrics exporter.
+ *
+ * By default, Pulsar client will use the {@link io.opentelemetry.api.GlobalOpenTelemetry} instance. If an
+ * OpenTelemetry JVM agent is configured, the metrics will be reported, otherwise the metrics will be
+ * completely disabled.
+ *
+ * @param openTelemetry the OpenTelemetry instance
+ * @return the client builder instance
+ */
+ ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
+
/**
* The clock used by the pulsar client.
*
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
index 7935e05d55b66..e488aa81151ce 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
@@ -29,9 +30,12 @@
*
*
All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ *
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats
*/
@InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
+@Deprecated
public interface ConsumerStats extends Serializable {
/**
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
index a26c20e740d37..9a9ade73669dd 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
@@ -29,9 +30,12 @@
*
*
All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
+ *
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats
*/
@InterfaceAudience.Public
-@InterfaceStability.Stable
+@InterfaceStability.Evolving
+@Deprecated
public interface ProducerStats extends Serializable {
/**
* @return the number of messages published in the last interval
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 7424b12db5aa2..3917e2996e180 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -52,6 +52,16 @@
pkg
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+
+ io.opentelemetry
+ opentelemetry-extension-incubator
+
+
${project.groupId}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index fc5c3a3c6798b..a3c9d1bc9ab48 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -263,8 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
// messageContainers before publishing this one-batch message.
- op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), firstCallback,
- batchAllocatedSizeBytes);
+ op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
+ firstCallback, batchAllocatedSizeBytes);
// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
// ProducerStats
@@ -314,7 +314,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
}
- OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
+ OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);
op.setNumMessagesInBatch(numMessagesInBatch);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index bdf00844c1cd2..81c196c731f70 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -20,6 +20,7 @@
import static java.lang.String.format;
import io.netty.buffer.ByteBuf;
+import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
@@ -34,6 +35,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
@@ -63,6 +65,11 @@ public class BinaryProtoLookupService implements LookupService {
private final ConcurrentHashMap>
partitionedMetadataInProgress = new ConcurrentHashMap<>();
+ private final LatencyHistogram histoGetBroker;
+ private final LatencyHistogram histoGetTopicMetadata;
+ private final LatencyHistogram histoGetSchema;
+ private final LatencyHistogram histoListTopics;
+
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
@@ -84,6 +91,15 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration",
+ "Duration of lookup operations", null,
+ Attributes.builder().put("pulsar.lookup.transport-type", "binary").build());
+ histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build());
+ histoGetTopicMetadata =
+ histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
+ histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
+ histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());
}
@Override
@@ -99,12 +115,20 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* @return broker-socket-address that serves given topic
*/
public CompletableFuture getBroker(TopicName topicName) {
+ long startTime = System.nanoTime();
final MutableObject newFutureCreated = new MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
newFutureCreated.setValue(newFuture);
+
+ newFuture.thenRun(() -> {
+ histoGetBroker.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoGetBroker.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
return newFuture;
});
} finally {
@@ -224,6 +248,7 @@ private CompletableFuture findBroker(InetSocketAddress socket
private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress,
TopicName topicName) {
+ long startTime = System.nanoTime();
CompletableFuture partitionFuture = new CompletableFuture<>();
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
@@ -231,11 +256,13 @@ private CompletableFuture getPartitionedTopicMetadata(
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
+ histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime);
log.warn("[{}] failed to get Partitioned metadata : {}", topicName,
t.getMessage(), t);
partitionFuture.completeExceptionally(t);
} else {
try {
+ histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime);
partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
@@ -263,6 +290,7 @@ public CompletableFuture> getSchema(TopicName topicName) {
@Override
public CompletableFuture> getSchema(TopicName topicName, byte[] version) {
+ long startTime = System.nanoTime();
CompletableFuture> schemaFuture = new CompletableFuture<>();
if (version != null && version.length == 0) {
schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
@@ -275,10 +303,12 @@ public CompletableFuture> getSchema(TopicName topicName, by
Optional.ofNullable(BytesSchemaVersion.of(version)));
clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> {
if (t != null) {
+ histoGetSchema.recordFailure(System.nanoTime() - startTime);
log.warn("[{}] failed to get schema : {}", topicName,
t.getMessage(), t);
schemaFuture.completeExceptionally(t);
} else {
+ histoGetSchema.recordSuccess(System.nanoTime() - startTime);
schemaFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
@@ -326,6 +356,8 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
Mode mode,
String topicsPattern,
String topicsHash) {
+ long startTime = System.nanoTime();
+
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
@@ -333,8 +365,10 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
if (t != null) {
+ histoListTopics.recordFailure(System.nanoTime() - startTime);
getTopicsResultFuture.completeExceptionally(t);
} else {
+ histoListTopics.recordSuccess(System.nanoTime() - startTime);
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 9a86d81c93fab..2548a52aa95a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import io.opentelemetry.api.OpenTelemetry;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.List;
@@ -121,6 +122,12 @@ public ClientBuilder authentication(Authentication authentication) {
return this;
}
+ @Override
+ public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) {
+ conf.setOpenTelemetry(openTelemetry);
+ return this;
+ }
+
@Override
public ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 938a0b4d8f683..03e0f406dd2f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -32,6 +32,7 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Promise;
+import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
@@ -60,6 +61,9 @@
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.Counter;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.client.util.TimedCompletableFuture;
@@ -201,6 +205,9 @@ protected enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}
+ private final Counter connectionsOpenedCounter;
+ private final Counter connectionsClosedCounter;
+
private static class RequestTime {
private final long creationTimeNanos;
final long requestId;
@@ -236,12 +243,13 @@ String getDescription() {
}
}
-
- public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
- this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+ public ClientCnx(InstrumentProvider instrumentProvider,
+ ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
+ this(instrumentProvider, conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
}
- public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
+ public ClientCnx(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ int protocolVersion) {
super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), false);
@@ -257,11 +265,19 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" + conf.getDescription()));
+ this.connectionsOpenedCounter =
+ instrumentProvider.newCounter("pulsar.client.connection.opened", Unit.Connections,
+ "The number of connections opened", null, Attributes.empty());
+ this.connectionsClosedCounter =
+ instrumentProvider.newCounter("pulsar.client.connection.closed", Unit.Connections,
+ "The number of connections closed", null, Attributes.empty());
+
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
+ connectionsOpenedCounter.increment();
this.localAddress = ctx.channel().localAddress();
this.remoteAddress = ctx.channel().remoteAddress();
@@ -304,6 +320,7 @@ protected ByteBuf newConnectCommand() throws Exception {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
+ connectionsClosedCounter.increment();
lastDisconnectedTimestamp = System.currentTimeMillis();
log.info("{} Disconnected", ctx.channel());
if (!connectionFuture.isDone()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 850e805067d12..d5adbdd7098ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -32,6 +32,7 @@
import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
+import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -54,6 +55,9 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.Counter;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.Unit;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
@@ -88,6 +92,8 @@ public class ConnectionPool implements AutoCloseable {
/** Async release useless connections task. **/
private ScheduledFuture asyncReleaseUselessConnectionsTask;
+ private final Counter connectionsTcpFailureCounter;
+ private final Counter connectionsHandshakeFailureCounter;
@Value
private static class Key {
@@ -96,16 +102,19 @@ private static class Key {
int randomKey;
}
- public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
- this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
+ public ConnectionPool(InstrumentProvider instrumentProvider,
+ ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
+ this(instrumentProvider, conf, eventLoopGroup, () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup));
}
- public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ public ConnectionPool(InstrumentProvider instrumentProvider,
+ ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier clientCnxSupplier) throws PulsarClientException {
- this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
+ this(instrumentProvider, conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
}
- public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+ public ConnectionPool(InstrumentProvider instrumentProvider,
+ ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
Supplier clientCnxSupplier,
Optional> addressResolver)
throws PulsarClientException {
@@ -155,6 +164,14 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou
}
}, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS);
}
+
+ connectionsTcpFailureCounter =
+ instrumentProvider.newCounter("pulsar.client.connection.failed", Unit.Connections,
+ "The number of failed connection attempts", null,
+ Attributes.builder().put("pulsar.failure.type", "tcp-failed").build());
+ connectionsHandshakeFailureCounter = instrumentProvider.newCounter("pulsar.client.connection.failed",
+ Unit.Connections, "The number of failed connection attempts", null,
+ Attributes.builder().put("pulsar.failure.type", "handshake").build());
}
private static AddressResolver createAddressResolver(ClientConfigurationData conf,
@@ -295,6 +312,7 @@ private CompletableFuture createConnection(Key key) {
}
cnxFuture.complete(cnx);
}).exceptionally(exception -> {
+ connectionsHandshakeFailureCounter.increment();
log.warn("[{}] Connection handshake failed: {}", cnx.channel(), exception.getMessage());
cnxFuture.completeExceptionally(exception);
// this cleanupConnection may happen before that the
@@ -306,6 +324,7 @@ private CompletableFuture createConnection(Key key) {
return null;
});
}).exceptionally(exception -> {
+ connectionsTcpFailureCounter.increment();
eventLoopGroup.execute(() -> {
log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage());
pool.remove(key, cnxFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5a0e5de330d31..f1e259086ec8a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -34,6 +34,7 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.FastThreadLocal;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -91,6 +92,10 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.metrics.Counter;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.Unit;
+import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -216,6 +221,17 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
private final boolean poolMessages;
+ private final Counter messagesReceivedCounter;
+ private final Counter bytesReceivedCounter;
+ private final UpDownCounter messagesPrefetchedGauge;
+ private final UpDownCounter bytesPrefetchedGauge;
+ private final Counter consumersOpenedCounter;
+ private final Counter consumersClosedCounter;
+ private final Counter consumerAcksCounter;
+ private final Counter consumerNacksCounter;
+
+ private final Counter consumerDlqMessagesCounter;
+
private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List previousExceptions = new CopyOnWriteArrayList();
private volatile boolean hasSoughtByTimestamp = false;
@@ -389,7 +405,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
+ consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
+ "The number of consumer sessions opened", topic, attrs);
+ consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
+ "The number of consumer sessions closed", topic, attrs);
+ messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
+ "The number of messages explicitly received by the consumer application", topic, attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
+ "The number of bytes explicitly received by the consumer application", topic, attrs);
+ messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
+ "The number of messages currently sitting in the consumer receive queue", topic, attrs);
+ bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
+ "The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);
+
+ consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
+ "The number of acknowledged messages", topic, attrs);
+ consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
+ "The number of negatively acknowledged messages", topic, attrs);
+ consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
+ "The number of messages sent to DLQ", topic, attrs);
grabCnx();
+
+ consumersOpenedCounter.increment();
}
public ConnectionHandler getConnectionHandler() {
@@ -552,6 +591,8 @@ protected CompletableFuture> internalBatchReceiveAsync() {
protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType,
Map properties,
TransactionImpl txn) {
+ consumerAcksCounter.increment();
+
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
@@ -573,6 +614,8 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack
@Override
protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType,
Map properties, TransactionImpl txn) {
+ consumerAcksCounter.increment();
+
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
@@ -668,6 +711,8 @@ protected CompletableFuture doReconsumeLater(Message> message, AckType a
.value(retryMessage.getData())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
+ consumerDlqMessagesCounter.increment();
+
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
}).exceptionally(ex -> {
@@ -760,6 +805,7 @@ private MessageImpl> getMessageImpl(Message> message) {
@Override
public void negativeAcknowledge(MessageId messageId) {
+ consumerNacksCounter.increment();
negativeAcksTracker.add(messageId);
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
@@ -768,6 +814,7 @@ public void negativeAcknowledge(MessageId messageId) {
@Override
public void negativeAcknowledge(Message> message) {
+ consumerNacksCounter.increment();
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
@@ -1048,6 +1095,8 @@ public CompletableFuture closeAsync() {
return closeFuture;
}
+ consumersClosedCounter.increment();
+
if (!isConnected()) {
log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
setState(State.Closed);
@@ -1240,6 +1289,9 @@ protected MessageImpl newMessage(final MessageIdImpl messageId,
}
private void executeNotifyCallback(final MessageImpl message) {
+ messagesPrefetchedGauge.increment();
+ bytesPrefetchedGauge.add(message.size());
+
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
@@ -1732,6 +1784,12 @@ protected synchronized void messageProcessed(Message> msg) {
ClientCnx msgCnx = ((MessageImpl>) msg).getCnx();
lastDequeuedMessageId = msg.getMessageId();
+ messagesPrefetchedGauge.decrement();
+ messagesReceivedCounter.increment();
+
+ bytesPrefetchedGauge.subtract(msg.size());
+ bytesReceivedCounter.add(msg.size());
+
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 02d0d10626fa6..8158b6d979efd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -34,6 +35,8 @@
import org.apache.pulsar.client.api.PulsarClientException.NotFoundException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
@@ -60,11 +63,26 @@ public class HttpLookupService implements LookupService {
private static final String BasePathV1 = "lookup/v2/destination/";
private static final String BasePathV2 = "lookup/v2/topic/";
- public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
+ private final LatencyHistogram histoGetBroker;
+ private final LatencyHistogram histoGetTopicMetadata;
+ private final LatencyHistogram histoGetSchema;
+ private final LatencyHistogram histoListTopics;
+
+ public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurationData conf,
+ EventLoopGroup eventLoopGroup)
throws PulsarClientException {
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
+
+ LatencyHistogram histo = instrumentProvider.newLatencyHistogram("pulsar.client.lookup.duration",
+ "Duration of lookup operations", null,
+ Attributes.builder().put("pulsar.lookup.transport-type", "http").build());
+ histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build());
+ histoGetTopicMetadata =
+ histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build());
+ histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
+ histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());
}
@Override
@@ -84,8 +102,18 @@ public CompletableFuture getBroker(TopicName topicName) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
- return httpClient.get(path, LookupData.class)
- .thenCompose(lookupData -> {
+
+ long startTime = System.nanoTime();
+ CompletableFuture httpFuture = httpClient.get(path, LookupData.class);
+
+ httpFuture.thenRun(() -> {
+ histoGetBroker.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoGetBroker.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
+
+ return httpFuture.thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
URI uri = null;
try {
@@ -112,9 +140,21 @@ public CompletableFuture getBroker(TopicName topicName) {
@Override
public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) {
+ long startTime = System.nanoTime();
+
String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
- return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
+ CompletableFuture httpFuture = httpClient.get(
+ String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
PartitionedTopicMetadata.class);
+
+ httpFuture.thenRun(() -> {
+ histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
+
+ return httpFuture;
}
@Override
@@ -130,6 +170,8 @@ public InetSocketAddress resolveHost() {
@Override
public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
String topicsPattern, String topicsHash) {
+ long startTime = System.nanoTime();
+
CompletableFuture future = new CompletableFuture<>();
String format = namespace.isV2()
@@ -152,6 +194,14 @@ public CompletableFuture getTopicsUnderNamespace(NamespaceName
future.completeExceptionally(cause);
return null;
});
+
+ future.thenRun(() -> {
+ histoListTopics.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoListTopics.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
+
return future;
}
@@ -162,6 +212,7 @@ public CompletableFuture> getSchema(TopicName topicName) {
@Override
public CompletableFuture> getSchema(TopicName topicName, byte[] version) {
+ long startTime = System.nanoTime();
CompletableFuture> future = new CompletableFuture<>();
String schemaName = topicName.getSchemaName();
@@ -201,6 +252,13 @@ public CompletableFuture> getSchema(TopicName topicName, by
}
return null;
});
+
+ future.thenRun(() -> {
+ histoGetSchema.recordSuccess(System.nanoTime() - startTime);
+ }).exceptionally(x -> {
+ histoGetSchema.recordFailure(System.nanoTime() - startTime);
+ return null;
+ });
return future;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 880185f7a9781..dbd3aae426900 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -40,6 +40,7 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
+import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -76,6 +77,11 @@
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.client.impl.metrics.Counter;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
+import org.apache.pulsar.client.impl.metrics.Unit;
+import org.apache.pulsar.client.impl.metrics.UpDownCounter;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.MathUtils;
@@ -171,6 +177,15 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne
private boolean errorState;
+ private final LatencyHistogram latencyHistogram;
+ final LatencyHistogram rpcLatencyHistogram;
+ private final Counter publishedBytesCounter;
+ private final UpDownCounter pendingMessagesUpDownCounter;
+ private final UpDownCounter pendingBytesUpDownCounter;
+
+ private final Counter producersOpenedCounter;
+ private final Counter producersClosedCounter;
+
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema,
ProducerInterceptors interceptors, Optional overrideProducerName) {
@@ -265,6 +280,26 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
+ "Publish latency experienced by the application, includes client batching time", topic,
+ Attributes.empty());
+ rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
+ "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
+ Attributes.empty());
+ publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
+ Unit.Bytes, "The number of bytes published", topic, Attributes.empty());
+ pendingMessagesUpDownCounter =
+ ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
+ "The number of messages in the producer internal send queue, waiting to be sent", topic,
+ Attributes.empty());
+ pendingBytesUpDownCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
+ "The size of the messages in the producer internal queue, waiting to sent", topic, Attributes.empty());
+ producersOpenedCounter = ip.newCounter("pulsar.client.producer.opened", Unit.Sessions,
+ "The number of producer sessions opened", topic, Attributes.empty());
+ producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
+ "The number of producer sessions closed", topic, Attributes.empty());
+
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
@@ -274,6 +309,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
this);
setChunkMaxMessageSize();
grabCnx();
+ producersOpenedCounter.increment();
}
private void setChunkMaxMessageSize() {
@@ -337,6 +373,11 @@ CompletableFuture internalSendAsync(Message> message) {
if (interceptors != null) {
interceptorMessage.getProperties();
}
+
+ int msgSize = interceptorMessage.getDataBuffer().readableBytes();
+ pendingMessagesUpDownCounter.increment();
+ pendingBytesUpDownCounter.add(msgSize);
+
sendAsync(interceptorMessage, new SendCallback() {
SendCallback nextCallback = null;
MessageImpl> nextMsg = null;
@@ -359,15 +400,22 @@ public MessageImpl> getNextMessage() {
@Override
public void sendComplete(Exception e) {
+ long latencyNanos = System.nanoTime() - createdAt;
+ pendingMessagesUpDownCounter.decrement();
+ pendingBytesUpDownCounter.subtract(msgSize);
+
try {
if (e != null) {
+ latencyHistogram.recordFailure(latencyNanos);
stats.incrementSendFailed();
onSendAcknowledgement(interceptorMessage, null, e);
future.completeExceptionally(e);
} else {
+ latencyHistogram.recordSuccess(latencyNanos);
+ publishedBytesCounter.add(msgSize);
onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
future.complete(interceptorMessage.getMessageId());
- stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
+ stats.incrementNumAcksReceived(latencyNanos);
}
} finally {
interceptorMessage.getDataBuffer().release();
@@ -413,15 +461,16 @@ CompletableFuture internalSendWithTxnAsync(Message> message, Transa
} else {
CompletableFuture completableFuture = new CompletableFuture<>();
if (!((TransactionImpl) txn).checkIfOpen(completableFuture)) {
- return completableFuture;
+ return completableFuture;
}
return ((TransactionImpl) txn).registerProducedTopic(topic)
- .thenCompose(ignored -> internalSendAsync(message));
+ .thenCompose(ignored -> internalSendAsync(message));
}
}
/**
* Compress the payload if compression is configured.
+ *
* @param payload
* @return a new payload
*/
@@ -473,9 +522,10 @@ public void sendAsync(Message> message, SendCallback callback) {
if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
PulsarClientException.InvalidMessageException invalidMessageException =
- new PulsarClientException.InvalidMessageException(
- format("The producer %s of the topic %s can not reuse the same message", producerName, topic),
- msg.getSequenceId());
+ new PulsarClientException.InvalidMessageException(
+ format("The producer %s of the topic %s can not reuse the same message", producerName,
+ topic),
+ msg.getSequenceId());
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
@@ -645,8 +695,8 @@ private void serializeAndSendMessage(MessageImpl> msg,
msgMetadata.setUuid(uuid);
}
msgMetadata.setChunkId(chunkId)
- .setNumChunksFromMsg(totalChunks)
- .setTotalChunkMsgSize(compressedPayloadSize);
+ .setNumChunksFromMsg(totalChunks)
+ .setTotalChunkMsgSize(compressedPayloadSize);
}
if (canAddToBatch(msg) && totalChunks <= 1) {
@@ -697,9 +747,9 @@ private void serializeAndSendMessage(MessageImpl> msg,
if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata,
encryptedPayload);
- op = OpSendMsg.create(msg, cmd, sequenceId, callback);
+ op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback);
} else {
- op = OpSendMsg.create(msg, null, sequenceId, callback);
+ op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback);
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
if (msgMetadata.hasChunkId()) {
@@ -780,8 +830,8 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
}
SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
schemaInfo = Optional.ofNullable(schemaInfo)
- .filter(si -> si.getType().getValue() > 0)
- .orElse(Schema.BYTES.getSchemaInfo());
+ .filter(si -> si.getType().getValue() > 0)
+ .orElse(Schema.BYTES.getSchemaInfo());
getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
if (ex != null) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
@@ -816,10 +866,10 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
private CompletableFuture getOrCreateSchemaAsync(ClientCnx cnx, SchemaInfo schemaInfo) {
if (!Commands.peerSupportsGetOrCreateSchema(cnx.getRemoteEndpointProtocolVersion())) {
return FutureUtil.failedFuture(
- new PulsarClientException.NotSupportedException(
- format("The command `GetOrCreateSchema` is not supported for the protocol version %d. "
- + "The producer is %s, topic is %s",
- cnx.getRemoteEndpointProtocolVersion(), producerName, topic)));
+ new PulsarClientException.NotSupportedException(
+ format("The command `GetOrCreateSchema` is not supported for the protocol version %d. "
+ + "The producer is %s, topic is %s",
+ cnx.getRemoteEndpointProtocolVersion(), producerName, topic)));
}
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic, schemaInfo);
@@ -891,7 +941,7 @@ private boolean canAddToBatch(MessageImpl> msg) {
private boolean canAddToCurrentBatch(MessageImpl> msg) {
return batchMessageContainer.haveEnoughSpace(msg)
- && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
+ && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg))
&& batchMessageContainer.hasSameTxn(msg);
}
@@ -920,30 +970,31 @@ private void doBatchSendAndAdd(MessageImpl> msg, SendCallback callback, ByteBu
private boolean isValidProducerState(SendCallback callback, long sequenceId) {
switch (getState()) {
- case Ready:
- // OK
- case Connecting:
- // We are OK to queue the messages on the client, it will be sent to the broker once we get the connection
- case RegisteringSchema:
- // registering schema
- return true;
- case Closing:
- case Closed:
- callback.sendComplete(
- new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId));
- return false;
- case ProducerFenced:
- callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"));
- return false;
- case Terminated:
- callback.sendComplete(
- new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId));
- return false;
- case Failed:
- case Uninitialized:
- default:
- callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId));
- return false;
+ case Ready:
+ // OK
+ case Connecting:
+ // We are OK to queue the messages on the client, it will be sent to the broker once we get the
+ // connection
+ case RegisteringSchema:
+ // registering schema
+ return true;
+ case Closing:
+ case Closed:
+ callback.sendComplete(
+ new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId));
+ return false;
+ case ProducerFenced:
+ callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"));
+ return false;
+ case Terminated:
+ callback.sendComplete(
+ new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId));
+ return false;
+ case Failed:
+ case Uninitialized:
+ default:
+ callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId));
+ return false;
}
}
@@ -1043,9 +1094,11 @@ private static final class LastSendFutureWrapper {
private LastSendFutureWrapper(CompletableFuture lastSendFuture) {
this.lastSendFuture = lastSendFuture;
}
+
static LastSendFutureWrapper create(CompletableFuture lastSendFuture) {
return new LastSendFutureWrapper(lastSendFuture);
}
+
public CompletableFuture handleOnce() {
return lastSendFuture.handle((ignore, t) -> {
if (t != null && THROW_ONCE_UPDATER.compareAndSet(this, FALSE, TRUE)) {
@@ -1070,6 +1123,7 @@ public CompletableFuture closeAsync() {
return CompletableFuture.completedFuture(null);
}
+ producersClosedCounter.increment();
closeProducerTasks();
ClientCnx cnx = cnx();
@@ -1276,9 +1330,10 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
releaseSemaphoreForSendOp(op);
try {
op.sendComplete(
- new PulsarClientException.ChecksumException(
- format("The checksum of the message which is produced by producer %s to the topic "
- + "%s is corrupted", producerName, topic)));
+ new PulsarClientException.ChecksumException(
+ format("The checksum of the message which is produced by producer %s to the "
+ + "topic "
+ + "%s is corrupted", producerName, topic)));
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
producerName, sequenceId, t);
@@ -1326,7 +1381,7 @@ protected synchronized void recoverNotAllowedError(long sequenceId, String error
*
* @param op
* @return returns true only if message is not modified and computed-checksum is same as previous checksum else
- * return false that means that message is corrupted. Returns true if checksum is not present.
+ * return false that means that message is corrupted. Returns true if checksum is not present.
*/
protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
ByteBufPair msg = op.cmd;
@@ -1402,6 +1457,7 @@ public ReferenceCounted touch(Object hint) {
}
protected static final class OpSendMsg {
+ LatencyHistogram rpcLatencyHistogram;
MessageImpl> msg;
List> msgs;
ByteBufPair cmd;
@@ -1421,6 +1477,7 @@ protected static final class OpSendMsg {
int chunkId = -1;
void initialize() {
+ rpcLatencyHistogram = null;
msg = null;
msgs = null;
cmd = null;
@@ -1440,9 +1497,11 @@ void initialize() {
chunkedMessageCtx = null;
}
- static OpSendMsg create(MessageImpl> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
+ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl> msg, ByteBufPair cmd,
+ long sequenceId, SendCallback callback) {
OpSendMsg op = RECYCLER.get();
op.initialize();
+ op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msg = msg;
op.cmd = cmd;
op.callback = callback;
@@ -1452,10 +1511,11 @@ static OpSendMsg create(MessageImpl> msg, ByteBufPair cmd, long sequenceId, Se
return op;
}
- static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback,
- int batchAllocatedSize) {
+ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd,
+ long sequenceId, SendCallback callback, int batchAllocatedSize) {
OpSendMsg op = RECYCLER.get();
op.initialize();
+ op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msgs = msgs;
op.cmd = cmd;
op.callback = callback;
@@ -1469,10 +1529,12 @@ static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenc
return op;
}
- static OpSendMsg create(List> msgs, ByteBufPair cmd, long lowestSequenceId,
- long highestSequenceId, SendCallback callback, int batchAllocatedSize) {
+ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd,
+ long lowestSequenceId,
+ long highestSequenceId, SendCallback callback, int batchAllocatedSize) {
OpSendMsg op = RECYCLER.get();
op.initialize();
+ op.rpcLatencyHistogram = rpcLatencyHistogram;
op.msgs = msgs;
op.cmd = cmd;
op.callback = callback;
@@ -1497,30 +1559,38 @@ void updateSentTimestamp() {
void sendComplete(final Exception e) {
SendCallback callback = this.callback;
+
+ long now = System.nanoTime();
if (null != callback) {
Exception finalEx = e;
if (finalEx instanceof TimeoutException) {
TimeoutException te = (TimeoutException) e;
long sequenceId = te.getSequenceId();
- long ns = System.nanoTime();
+
//firstSentAt and lastSentAt maybe -1, it means that the message didn't flush to channel.
String errMsg = String.format(
- "%s : createdAt %s seconds ago, firstSentAt %s seconds ago, lastSentAt %s seconds ago, "
- + "retryCount %s",
- te.getMessage(),
- RelativeTimeUtil.nsToSeconds(ns - this.createdAt),
- RelativeTimeUtil.nsToSeconds(this.firstSentAt <= 0
- ? this.firstSentAt
- : ns - this.firstSentAt),
- RelativeTimeUtil.nsToSeconds(this.lastSentAt <= 0
- ? this.lastSentAt
- : ns - this.lastSentAt),
- retryCount
+ "%s : createdAt %s seconds ago, firstSentAt %s seconds ago, lastSentAt %s seconds ago, "
+ + "retryCount %s",
+ te.getMessage(),
+ RelativeTimeUtil.nsToSeconds(now - this.createdAt),
+ RelativeTimeUtil.nsToSeconds(this.firstSentAt <= 0
+ ? this.firstSentAt
+ : now - this.firstSentAt),
+ RelativeTimeUtil.nsToSeconds(this.lastSentAt <= 0
+ ? this.lastSentAt
+ : now - this.lastSentAt),
+ retryCount
);
finalEx = new TimeoutException(errMsg, sequenceId);
}
+ if (e == null) {
+ rpcLatencyHistogram.recordSuccess(now - this.lastSentAt);
+ } else {
+ rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
+ }
+
callback.sendComplete(finalEx);
}
}
@@ -1687,7 +1757,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) {
long requestId = client.newRequestId();
PRODUCER_DEADLINE_UPDATER
- .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs());
+ .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs());
SchemaInfo schemaInfo = null;
if (schema != null) {
@@ -1698,7 +1768,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) {
// but now we have standardized on every schema to generate an Avro based schema
if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
schemaInfo = schema.getSchemaInfo();
- } else if (schema instanceof JSONSchema){
+ } else if (schema instanceof JSONSchema) {
JSONSchema jsonSchema = (JSONSchema) schema;
schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
} else {
@@ -1721,146 +1791,148 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) {
conf.getAccessMode(), topicEpoch, client.conf.isEnableTransaction(),
conf.getInitialSubscriptionName()),
requestId).thenAccept(response -> {
- String producerName = response.getProducerName();
- long lastSequenceId = response.getLastSequenceId();
- schemaVersion = Optional.ofNullable(response.getSchemaVersion());
- schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v));
-
- // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
- // set the cnx pointer so that new messages will be sent immediately
- synchronized (ProducerImpl.this) {
- State state = getState();
- if (state == State.Closing || state == State.Closed) {
- // Producer was closed while reconnecting, close the connection to make sure the broker
- // drops the producer on its side
- cnx.removeProducer(producerId);
- cnx.channel().close();
- future.complete(null);
- return;
- }
- resetBackoff();
+ String producerName = response.getProducerName();
+ long lastSequenceId = response.getLastSequenceId();
+ schemaVersion = Optional.ofNullable(response.getSchemaVersion());
+ schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v));
- log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
- connectionId = cnx.ctx().channel().toString();
- connectedSince = DateFormatter.now();
- if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) {
- log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch());
- }
- topicEpoch = response.getTopicEpoch();
+ // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
+ // set the cnx pointer so that new messages will be sent immediately
+ synchronized (ProducerImpl.this) {
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
+ // Producer was closed while reconnecting, close the connection to make sure the broker
+ // drops the producer on its side
+ cnx.removeProducer(producerId);
+ cnx.channel().close();
+ future.complete(null);
+ return;
+ }
+ resetBackoff();
- if (this.producerName == null) {
- this.producerName = producerName;
- }
+ log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
+ connectionId = cnx.ctx().channel().toString();
+ connectedSince = DateFormatter.now();
+ if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) {
+ log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch());
+ }
+ topicEpoch = response.getTopicEpoch();
- if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) {
- // Only update sequence id generator if it wasn't already modified. That means we only want
- // to update the id generator the first time the producer gets established, and ignore the
- // sequence id sent by broker in subsequent producer reconnects
- this.lastSequenceIdPublished = lastSequenceId;
- this.msgIdGenerator = lastSequenceId + 1;
- }
+ if (this.producerName == null) {
+ this.producerName = producerName;
+ }
- resendMessages(cnx, epoch);
- }
- future.complete(null);
- }).exceptionally((e) -> {
- Throwable cause = e.getCause();
- cnx.removeProducer(producerId);
- State state = getState();
- if (state == State.Closing || state == State.Closed) {
- // Producer was closed while reconnecting, close the connection to make sure the broker
- // drops the producer on its side
- cnx.channel().close();
- future.complete(null);
- return null;
- }
+ if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) {
+ // Only update sequence id generator if it wasn't already modified. That means we only want
+ // to update the id generator the first time the producer gets established, and ignore the
+ // sequence id sent by broker in subsequent producer reconnects
+ this.lastSequenceIdPublished = lastSequenceId;
+ this.msgIdGenerator = lastSequenceId + 1;
+ }
- if (cause instanceof TimeoutException) {
- // Creating the producer has timed out. We need to ensure the broker closes the producer
- // in case it was indeed created, otherwise it might prevent new create producer operation,
- // since we are not necessarily closing the connection.
- long closeRequestId = client.newRequestId();
- ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId);
- cnx.sendRequestWithId(cmd, closeRequestId);
- }
+ resendMessages(cnx, epoch);
+ }
+ future.complete(null);
+ }).exceptionally((e) -> {
+ Throwable cause = e.getCause();
+ cnx.removeProducer(producerId);
+ State state = getState();
+ if (state == State.Closing || state == State.Closed) {
+ // Producer was closed while reconnecting, close the connection to make sure the broker
+ // drops the producer on its side
+ cnx.channel().close();
+ future.complete(null);
+ return null;
+ }
- if (cause instanceof PulsarClientException.ProducerFencedException) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Failed to create producer: {}",
- topic, producerName, cause.getMessage());
- }
- } else {
- log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
+ if (cause instanceof TimeoutException) {
+ // Creating the producer has timed out. We need to ensure the broker closes the producer
+ // in case it was indeed created, otherwise it might prevent new create producer operation,
+ // since we are not necessarily closing the connection.
+ long closeRequestId = client.newRequestId();
+ ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId);
+ cnx.sendRequestWithId(cmd, closeRequestId);
+ }
+
+ if (cause instanceof PulsarClientException.ProducerFencedException) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Failed to create producer: {}",
+ topic, producerName, cause.getMessage());
+ }
+ } else {
+ log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
+ }
+ // Close the producer since topic does not exist.
+ if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
+ closeAsync().whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.error("Failed to close producer on TopicDoesNotExistException.", ex);
}
- // Close the producer since topic does not exist.
- if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
- closeAsync().whenComplete((v, ex) -> {
- if (ex != null) {
- log.error("Failed to close producer on TopicDoesNotExistException.", ex);
- }
- producerCreatedFuture.completeExceptionally(cause);
- });
- future.complete(null);
- return null;
+ producerCreatedFuture.completeExceptionally(cause);
+ });
+ future.complete(null);
+ return null;
+ }
+ if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
+ synchronized (this) {
+ log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,
+ producerName);
+
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
+ pendingMessages.messagesCount());
}
- if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
- synchronized (this) {
- log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,
- producerName);
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
- pendingMessages.messagesCount());
- }
-
- PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
- format("The backlog quota of the topic %s that the producer %s produces to is exceeded",
+
+ PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
+ format("The backlog quota of the topic %s that the producer %s produces to is exceeded",
topic, producerName));
- failPendingMessages(cnx(), bqe);
- }
- } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
- log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.",
- producerName, topic);
- }
+ failPendingMessages(cnx(), bqe);
+ }
+ } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
+ log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.",
+ producerName, topic);
+ }
- if (cause instanceof PulsarClientException.TopicTerminatedException) {
- setState(State.Terminated);
- synchronized (this) {
- failPendingMessages(cnx(), (PulsarClientException) cause);
- }
- producerCreatedFuture.completeExceptionally(cause);
- closeProducerTasks();
- client.cleanupProducer(this);
- } else if (cause instanceof PulsarClientException.ProducerFencedException) {
- setState(State.ProducerFenced);
- synchronized (this) {
- failPendingMessages(cnx(), (PulsarClientException) cause);
- }
- producerCreatedFuture.completeExceptionally(cause);
- closeProducerTasks();
- client.cleanupProducer(this);
- } else if (producerCreatedFuture.isDone()
- || (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause)
- && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) {
- // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
- // still within the initial timeout budget and we are dealing with a retriable error
- future.completeExceptionally(cause);
- } else {
- setState(State.Failed);
- producerCreatedFuture.completeExceptionally(cause);
- closeProducerTasks();
- client.cleanupProducer(this);
- Timeout timeout = sendTimeout;
- if (timeout != null) {
- timeout.cancel();
- sendTimeout = null;
- }
- }
- if (!future.isDone()) {
- future.complete(null);
- }
- return null;
- });
+ if (cause instanceof PulsarClientException.TopicTerminatedException) {
+ setState(State.Terminated);
+ synchronized (this) {
+ failPendingMessages(cnx(), (PulsarClientException) cause);
+ }
+ producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
+ client.cleanupProducer(this);
+ } else if (cause instanceof PulsarClientException.ProducerFencedException) {
+ setState(State.ProducerFenced);
+ synchronized (this) {
+ failPendingMessages(cnx(), (PulsarClientException) cause);
+ }
+ producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
+ client.cleanupProducer(this);
+ } else if (producerCreatedFuture.isDone() || (
+ cause instanceof PulsarClientException
+ && PulsarClientException.isRetriableError(cause)
+ && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this)
+ )) {
+ // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
+ // still within the initial timeout budget and we are dealing with a retriable error
+ future.completeExceptionally(cause);
+ } else {
+ setState(State.Failed);
+ producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
+ client.cleanupProducer(this);
+ Timeout timeout = sendTimeout;
+ if (timeout != null) {
+ timeout.cancel();
+ sendTimeout = null;
+ }
+ }
+ if (!future.isDone()) {
+ future.complete(null);
+ }
+ return null;
+ });
return future;
}
@@ -1966,7 +2038,7 @@ private void stripChecksum(OpSendMsg op) {
headerFrame.setInt(0, newTotalFrameSizeLength); // rewrite new [total-size]
ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark); // sliced only
- // metadata
+ // metadata
headerFrame.writerIndex(headerSize); // set headerFrame write-index to overwrite metadata over checksum
metadata.readBytes(headerFrame, metadata.readableBytes());
headerFrame.capacity(headerFrameSize - checksumSize); // reduce capacity by removed checksum bytes
@@ -2078,6 +2150,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
op.sequenceId, t);
}
+
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
@@ -2102,7 +2175,6 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
/**
* fail any pending batch messages that were enqueued, however batch was not closed out.
- *
*/
private void failPendingBatchMessages(PulsarClientException ex) {
if (batchMessageContainer.isEmpty()) {
@@ -2122,7 +2194,7 @@ public CompletableFuture flushAsync() {
if (isBatchMessagingEnabled()) {
batchMessageAndSend(false);
}
- CompletableFuture lastSendFuture = this.lastSendFuture;
+ CompletableFuture lastSendFuture = this.lastSendFuture;
if (!(lastSendFuture == this.lastSendFutureWrapper.lastSendFuture)) {
this.lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture);
}
@@ -2241,7 +2313,7 @@ protected void processOpSendMsg(OpSendMsg op) {
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName,
- op.sequenceId);
+ op.sequenceId);
}
}
} catch (Throwable t) {
@@ -2257,7 +2329,8 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
// In this case, the cnx passed to this method is no longer the active connection. This method will get
// called again once the new connection registers the producer with the broker.
log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
- + " {} pending messages since they will deliver using another connection.", topic, producerName,
+ + " {} pending messages since they will deliver using another connection.", topic,
+ producerName,
pendingMessages.messagesCount());
return;
}
@@ -2298,7 +2371,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
- cnx.channel(), op.sequenceId);
+ cnx.channel(), op.sequenceId);
}
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
@@ -2322,7 +2395,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
}
/**
- * Check if final message size for non-batch and non-chunked messages is larger than max message size.
+ * Check if final message size for non-batch and non-chunked messages is larger than max message size.
*/
private boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && !conf.isChunkingEnabled()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 179996f4ea9f1..a919eb19a7ff8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -70,6 +70,7 @@
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
@@ -149,6 +150,8 @@ public SchemaInfoProvider load(String topicName) {
private final Clock clientClock;
+ private final InstrumentProvider instrumentProvider;
+
@Getter
private TransactionCoordinatorClientImpl tcClient;
@@ -176,6 +179,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
Timer timer, ExecutorProvider externalExecutorProvider,
ExecutorProvider internalExecutorProvider,
ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException {
+
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
try {
@@ -193,10 +197,12 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.conf = conf;
+ this.instrumentProvider = new InstrumentProvider(conf.getOpenTelemetry());
clientClock = conf.getClock();
conf.getAuthentication().start();
connectionPoolReference =
- connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup);
+ connectionPool != null ? connectionPool :
+ new ConnectionPool(instrumentProvider, conf, this.eventLoopGroup);
this.cnxPool = connectionPoolReference;
this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider :
new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
@@ -205,7 +211,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(conf, this.eventLoopGroup);
+ lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(), this.scheduledExecutorProvider.getExecutor());
@@ -1053,7 +1059,7 @@ public void reloadLookUp() throws PulsarClientException {
public LookupService createLookup(String url) throws PulsarClientException {
if (url.startsWith("http")) {
- return new HttpLookupService(conf, eventLoopGroup);
+ return new HttpLookupService(instrumentProvider, conf, eventLoopGroup);
} else {
return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
@@ -1231,6 +1237,11 @@ public ScheduledExecutorProvider getScheduledExecutorProvider() {
return scheduledExecutorProvider;
}
+ InstrumentProvider instrumentProvider() {
+ return instrumentProvider;
+ }
+
+
//
// Transaction related API
//
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 20ec9c3d99af4..e755b6ba1ee6d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -22,6 +22,7 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FastThreadLocal;
+import io.opentelemetry.api.common.Attributes;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Collections;
@@ -35,6 +36,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.Counter;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.Unit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +56,8 @@ public class UnAckedMessageTracker implements Closeable {
protected final long ackTimeoutMillis;
protected final long tickDurationInMs;
+ private final Counter consumerAckTimeoutsCounter;
+
private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker {
@Override
public void clear() {
@@ -89,13 +95,14 @@ public void close() {
protected Timeout timeout;
- public UnAckedMessageTracker() {
+ private UnAckedMessageTracker() {
readLock = null;
writeLock = null;
timePartitions = null;
messageIdPartitionMap = null;
this.ackTimeoutMillis = 0;
this.tickDurationInMs = 0;
+ this.consumerAckTimeoutsCounter = null;
}
protected static final FastThreadLocal> TL_MESSAGE_IDS_SET =
@@ -114,6 +121,14 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase> consumerBa
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
+
+ InstrumentProvider ip = client.instrumentProvider();
+ consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages,
+ "The number of messages that were not acknowledged in the configured timeout period, hence, were "
+ + "requested by the client to be redelivered",
+ consumerBase.getTopic(),
+ Attributes.builder().put("pulsar.subscription", consumerBase.getSubscription()).build());
+
if (conf.getAckTimeoutRedeliveryBackoff() == null) {
this.messageIdPartitionMap = new HashMap<>();
this.timePartitions = new ArrayDeque<>();
@@ -136,6 +151,7 @@ public void run(Timeout t) throws Exception {
try {
HashSet headPartition = timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
+ consumerAckTimeoutsCounter.add(headPartition.size());
log.info("[{}] {} messages will be re-delivered", consumerBase, headPartition.size());
headPartition.forEach(messageId -> {
if (messageId instanceof ChunkMessageIdImpl) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1dc1c2a8689c6..6dcea7dc46672 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.opentelemetry.api.OpenTelemetry;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.net.InetSocketAddress;
@@ -395,6 +396,8 @@ public class ClientConfigurationData implements Serializable, Cloneable {
)
private String description;
+ private transient OpenTelemetry openTelemetry;
+
/**
* Gets the authentication settings for the client.
*
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java
new file mode 100644
index 0000000000000..fffbab4217a86
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.metrics;
+
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels;
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.extension.incubator.metrics.ExtendedLongCounterBuilder;
+
+public class Counter {
+
+ private final LongCounter counter;
+ private final Attributes attributes;
+
+ Counter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes) {
+ LongCounterBuilder builder = meter.counterBuilder(name)
+ .setDescription(description)
+ .setUnit(unit.toString());
+
+ if (topic != null) {
+ if (builder instanceof ExtendedLongCounterBuilder) {
+ ExtendedLongCounterBuilder eb = (ExtendedLongCounterBuilder) builder;
+ eb.setAttributesAdvice(getDefaultAggregationLabels(attributes));
+ }
+
+ attributes = getTopicAttributes(topic, attributes);
+ }
+
+ this.counter = builder.build();
+ this.attributes = attributes;
+ }
+
+ public void increment() {
+ add(1);
+ }
+
+ public void add(int delta) {
+ counter.add(delta, attributes);
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
new file mode 100644
index 0000000000000..1e02af1fd37e1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.PulsarVersion;
+
+public class InstrumentProvider {
+
+ public static final InstrumentProvider NOOP = new InstrumentProvider(OpenTelemetry.noop());
+
+ private final Meter meter;
+
+ public InstrumentProvider(OpenTelemetry otel) {
+ if (otel == null) {
+ // By default, metrics are disabled, unless the OTel java agent is configured.
+ // This allows to enable metrics without any code change.
+ otel = GlobalOpenTelemetry.get();
+ }
+ this.meter = otel.getMeterProvider()
+ .meterBuilder("org.apache.pulsar.client")
+ .setInstrumentationVersion(PulsarVersion.getVersion())
+ .build();
+ }
+
+ public Counter newCounter(String name, Unit unit, String description, String topic, Attributes attributes) {
+ return new Counter(meter, name, unit, description, topic, attributes);
+ }
+
+ public UpDownCounter newUpDownCounter(String name, Unit unit, String description, String topic,
+ Attributes attributes) {
+ return new UpDownCounter(meter, name, unit, description, topic, attributes);
+ }
+
+ public LatencyHistogram newLatencyHistogram(String name, String description, String topic, Attributes attributes) {
+ return new LatencyHistogram(meter, name, description, topic, attributes);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java
new file mode 100644
index 0000000000000..ed04eff03b39d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels;
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes;
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.extension.incubator.metrics.ExtendedDoubleHistogramBuilder;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class LatencyHistogram {
+
+ // Used for tests
+ public static final LatencyHistogram NOOP = new LatencyHistogram() {
+ public void recordSuccess(long latencyNanos) {
+ }
+
+ public void recordFailure(long latencyNanos) {
+ }
+ };
+
+ private static final List latencyHistogramBuckets =
+ Lists.newArrayList(.0005, .001, .0025, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0);
+
+ private static final double NANOS = TimeUnit.SECONDS.toNanos(1);
+
+ private final Attributes successAttributes;
+
+ private final Attributes failedAttributes;
+ private final DoubleHistogram histogram;
+
+ private LatencyHistogram() {
+ successAttributes = null;
+ failedAttributes = null;
+ histogram = null;
+ }
+
+ LatencyHistogram(Meter meter, String name, String description, String topic, Attributes attributes) {
+ DoubleHistogramBuilder builder = meter.histogramBuilder(name)
+ .setDescription(description)
+ .setUnit(Unit.Seconds.toString())
+ .setExplicitBucketBoundariesAdvice(latencyHistogramBuckets);
+
+ if (topic != null) {
+ if (builder instanceof ExtendedDoubleHistogramBuilder) {
+ ExtendedDoubleHistogramBuilder eb = (ExtendedDoubleHistogramBuilder) builder;
+ eb.setAttributesAdvice(
+ getDefaultAggregationLabels(
+ attributes.toBuilder().put("pulsar.response.status", "success").build()));
+ }
+ attributes = getTopicAttributes(topic, attributes);
+ }
+
+ successAttributes = attributes.toBuilder()
+ .put("pulsar.response.status", "success")
+ .build();
+ failedAttributes = attributes.toBuilder()
+ .put("pulsar.response.status", "failed")
+ .build();
+ this.histogram = builder.build();
+ }
+
+ private LatencyHistogram(DoubleHistogram histogram, Attributes successAttributes, Attributes failedAttributes) {
+ this.histogram = histogram;
+ this.successAttributes = successAttributes;
+ this.failedAttributes = failedAttributes;
+ }
+
+ /**
+ * Create a new histograms that inherits the old histograms attributes and adds new ones.
+ */
+ public LatencyHistogram withAttributes(Attributes attributes) {
+ return new LatencyHistogram(
+ histogram,
+ successAttributes.toBuilder().putAll(attributes).build(),
+ failedAttributes.toBuilder().putAll(attributes).build()
+ );
+ }
+
+
+ public void recordSuccess(long latencyNanos) {
+ histogram.record(latencyNanos / NANOS, successAttributes);
+ }
+
+ public void recordFailure(long latencyNanos) {
+ histogram.record(latencyNanos / NANOS, failedAttributes);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java
new file mode 100644
index 0000000000000..b9802f4f32b5f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.experimental.UtilityClass;
+import org.apache.pulsar.common.naming.TopicName;
+
+@UtilityClass
+public class MetricsUtil {
+
+ // By default, advice to use namespace level aggregation only
+ private static final List> DEFAULT_AGGREGATION_LABELS = Lists.newArrayList(
+ AttributeKey.stringKey("pulsar.tenant"),
+ AttributeKey.stringKey("pulsar.namespace")
+ );
+
+ static List> getDefaultAggregationLabels(Attributes attrs) {
+ List> res = new ArrayList<>();
+ res.addAll(DEFAULT_AGGREGATION_LABELS);
+ res.addAll(attrs.asMap().keySet());
+ return res;
+ }
+
+ static Attributes getTopicAttributes(String topic, Attributes baseAttributes) {
+ TopicName tn = TopicName.get(topic);
+
+ AttributesBuilder ab = baseAttributes.toBuilder();
+ if (tn.isPartitioned()) {
+ ab.put("pulsar.partition", tn.getPartitionIndex());
+ }
+ ab.put("pulsar.topic", tn.getPartitionedTopicName());
+ ab.put("pulsar.namespace", tn.getNamespace());
+ ab.put("pulsar.tenant", tn.getTenant());
+ return ab.build();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java
new file mode 100644
index 0000000000000..5204cc2f03eae
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+public enum Unit {
+ Bytes,
+
+ Messages,
+
+ Seconds,
+
+ Connections,
+
+ Sessions,
+
+ None,
+
+ ;
+
+ public String toString() {
+ switch (this) {
+ case Bytes:
+ return "By";
+
+ case Messages:
+ return "{message}";
+
+ case Seconds:
+ return "s";
+
+ case Connections:
+ return "{connection}";
+
+ case Sessions:
+ return "{session}";
+
+ case None:
+ default:
+ return "1";
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java
new file mode 100644
index 0000000000000..3df0c2bb42302
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels;
+import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.extension.incubator.metrics.ExtendedLongUpDownCounterBuilder;
+
+public class UpDownCounter {
+
+ private final LongUpDownCounter counter;
+ private final Attributes attributes;
+
+ UpDownCounter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes) {
+ LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name)
+ .setDescription(description)
+ .setUnit(unit.toString());
+
+ if (topic != null) {
+ if (builder instanceof ExtendedLongUpDownCounterBuilder) {
+ ExtendedLongUpDownCounterBuilder eb = (ExtendedLongUpDownCounterBuilder) builder;
+ eb.setAttributesAdvice(getDefaultAggregationLabels(attributes));
+ }
+
+ attributes = getTopicAttributes(topic, attributes);
+ }
+
+ this.counter = builder.build();
+ this.attributes = attributes;
+ }
+
+ public void increment() {
+ add(1);
+ }
+
+ public void decrement() {
+ add(-1);
+ }
+
+ public void add(long delta) {
+ counter.add(delta, attributes);
+ }
+
+ public void subtract(long diff) {
+ add(-diff);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java
new file mode 100644
index 0000000000000..ee99bb3332c26
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Pulsar Client OTel metrics utilities
+ */
+package org.apache.pulsar.client.impl.metrics;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index 1d1a6f85bfd41..514e3dde14070 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
@@ -70,7 +71,7 @@ public void setup() throws NoSuchFieldException, IllegalAccessException {
doReturn(client).when(consumer).getClient();
doReturn(cnx).when(consumer).getClientCnx();
doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats();
- doReturn(new UnAckedMessageTracker().UNACKED_MESSAGE_TRACKER_DISABLED)
+ doReturn(UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED)
.when(consumer).getUnAckedMessageTracker();
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
when(cnx.ctx()).thenReturn(ctx);
@@ -423,7 +424,7 @@ public void testDoIndividualBatchAckAsync() throws Exception{
public class ClientCnxTest extends ClientCnx {
public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
- super(conf, eventLoopGroup);
+ super(InstrumentProvider.NOOP, conf, eventLoopGroup);
}
@Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
index 87188255b20b8..983cd21a7a9d8 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -65,6 +66,7 @@ public void setup() throws Exception {
doReturn(0).when(clientConfig).getMaxLookupRedirects();
PulsarClientImpl client = mock(PulsarClientImpl.class);
+ doReturn(InstrumentProvider.NOOP).when(client).instrumentProvider();
doReturn(cnxPool).when(client).getCnxPool();
doReturn(clientConfig).when(client).getConfiguration();
doReturn(1L).when(client).newRequestId();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
index ca6114d2ed823..d573229fddefa 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java
@@ -26,6 +26,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -60,7 +61,7 @@ void setupClientCnx() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setKeepAliveIntervalSeconds(0);
conf.setOperationTimeoutMs(1);
- cnx = new ClientCnx(conf, eventLoop);
+ cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 4f657da82b289..bc1d940c76bbf 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -42,6 +42,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
@@ -63,7 +64,7 @@ public void testClientCnxTimeout() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
conf.setKeepAliveIntervalSeconds(0);
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -89,7 +90,7 @@ public void testPendingLookupRequestSemaphore() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -127,7 +128,7 @@ public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -170,7 +171,7 @@ public void testPendingWaitingLookupRequestSemaphore() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10_000);
conf.setKeepAliveIntervalSeconds(0);
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -196,7 +197,7 @@ public void testReceiveErrorAtSendConnectFrameState() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -230,7 +231,7 @@ public void testGetLastMessageIdWithError() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -276,7 +277,7 @@ public void testHandleCloseConsumer() {
ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
long consumerId = 1;
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
@@ -300,7 +301,7 @@ public void testHandleCloseProducer() {
ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
ClientConfigurationData conf = new ClientConfigurationData();
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
long producerId = 1;
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
@@ -393,7 +394,7 @@ private void withConnection(String testName, Consumer test) {
try {
ClientConfigurationData conf = new ClientConfigurationData();
- ClientCnx cnx = new ClientCnx(conf, eventLoop);
+ ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
index 2db23782640eb..efcc06bede3e1 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
@@ -23,6 +23,7 @@
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import java.util.Arrays;
+import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -39,7 +40,7 @@ public void createMockMessage() {
}
private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
- return ProducerImpl.OpSendMsg.create(message, null, 0L, null);
+ return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null);
}
@Test
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index b96f6a344a3dc..f96d2e2e0b0e9 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -47,6 +47,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.assertj.core.util.Sets;
@@ -78,6 +79,7 @@ public void setup() {
producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
when(client.getConfiguration()).thenReturn(clientConfigurationData);
when(client.timer()).thenReturn(timer);
when(client.newProducer()).thenReturn(producerBuilderImpl);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index 6fcedc3f94de7..f9df63759394a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -27,6 +27,7 @@
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -42,6 +43,7 @@ public void testChunkedMessageCtxDeallocate() {
for (int i = 0; i < totalChunks; i++) {
ProducerImpl.OpSendMsg opSendMsg =
ProducerImpl.OpSendMsg.create(
+ LatencyHistogram.NOOP,
MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null),
null, 0, null);
opSendMsg.chunkedMessageCtx = ctx;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
index c96443c1e2f9f..274b9b4f2d572 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java
@@ -53,6 +53,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -180,7 +181,7 @@ public void testInitializeWithTimer() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
@Cleanup("shutdownGracefully")
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
- ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
+ ConnectionPool pool = Mockito.spy(new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop));
conf.setServiceUrl("pulsar://localhost:6650");
HashedWheelTimer timer = new HashedWheelTimer();
@@ -205,7 +206,7 @@ public void testResourceCleanup() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("");
initializeEventLoopGroup(conf);
- try (ConnectionPool connectionPool = new ConnectionPool(conf, eventLoopGroup)) {
+ try (ConnectionPool connectionPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup)) {
assertThrows(() -> new PulsarClientImpl(conf, eventLoopGroup, connectionPool));
} finally {
// Externally passed eventLoopGroup should not be shutdown.
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index 91ad321048226..b01fbcb879f80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -36,6 +36,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
@@ -46,6 +47,7 @@ public class UnAckedMessageTrackerTest {
public void testAddAndRemove() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
when(client.getCnxPool()).thenReturn(connectionPool);
Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
1, TimeUnit.MILLISECONDS);
@@ -86,6 +88,7 @@ public void testAddAndRemove() {
public void testTrackChunkedMessageId() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP);
when(client.getCnxPool()).thenReturn(connectionPool);
Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
1, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
index 5856395566a67..27f521ef1ff73 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java
@@ -22,7 +22,14 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import io.opentelemetry.api.OpenTelemetry;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import lombok.Cleanup;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.testng.Assert;
import org.testng.annotations.Test;
/**
@@ -36,10 +43,35 @@ public void testDoNotPrintSensitiveInfo() throws JsonProcessingException {
clientConfigurationData.setTlsTrustStorePassword("xxxx");
clientConfigurationData.setSocks5ProxyPassword("yyyy");
clientConfigurationData.setAuthentication(new AuthenticationToken("zzzz"));
+ clientConfigurationData.setOpenTelemetry(OpenTelemetry.noop());
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
String serializedConf = objectMapper.writeValueAsString(clientConfigurationData);
assertThat(serializedConf).doesNotContain("xxxx", "yyyy", "zzzz");
}
+ @Test
+ public void testSerializable() throws Exception {
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setConnectionsPerBroker(3);
+ conf.setTlsTrustStorePassword("xxxx");
+ conf.setOpenTelemetry(OpenTelemetry.noop());
+
+ @Cleanup
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ @Cleanup
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(conf);
+ byte[] serialized = bos.toByteArray();
+
+ // Deserialize
+ @Cleanup
+ ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
+ @Cleanup
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ Object object = ois.readObject();
+
+ Assert.assertEquals(object.getClass(), ClientConfigurationData.class);
+ Assert.assertEquals(object, conf);
+ }
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 782454022b1ed..d15d48b9209d0 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -26,6 +26,7 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.Commands;
@@ -47,7 +48,7 @@ public class ProxyClientCnx extends ClientCnx {
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
String clientAuthMethod, int protocolVersion,
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
- super(conf, eventLoopGroup, protocolVersion);
+ super(InstrumentProvider.NOOP, conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
this.clientAuthMethod = clientAuthMethod;
this.forwardClientAuthData = forwardClientAuthData;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index ba9247a085dff..594d6cbc3bb59 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -59,6 +59,7 @@
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
@@ -383,11 +384,12 @@ private synchronized void completeConnect() throws PulsarClientException {
service.getConfiguration().isForwardAuthorizationCredentials(), this);
} else {
clientCnxSupplier =
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
+ () -> new ClientCnx(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(),
+ protocolVersionToAdvertise);
}
if (this.connectionPool == null) {
- this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(),
+ this.connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(),
clientCnxSupplier,
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())));
} else {
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
index 3f58250e6d68a..1a9459619ebe9 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java
@@ -42,6 +42,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -251,8 +252,8 @@ private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConf
final EventLoopGroup eventLoopGroup)
throws Exception {
- ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
- return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+ ConnectionPool cnxPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup, () -> {
+ return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
@Override
protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
throw new UnsupportedOperationException();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 9bc12dcc6fcb2..e1e49f9e8c5f2 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -50,6 +50,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
@@ -374,8 +375,8 @@ private PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurati
EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
registerCloseable(() -> eventLoopGroup.shutdownNow());
- ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> {
- return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
+ ConnectionPool cnxPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup, () -> {
+ return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup, ProtocolVersion.v11_VALUE) {
@Override
protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
throw new UnsupportedOperationException();
diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index ecc12b2e563d5..ec32b57be15f4 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -112,6 +112,23 @@
jackson-databind
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+
+
+ io.opentelemetry
+ opentelemetry-sdk-extension-autoconfigure
+
+
org.awaitility
awaitility
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 3b44023ef503e..b6b3d805edc75 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.lang.management.ManagementFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -76,7 +77,9 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu
.listenerThreads(arguments.listenerThreads)
.tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath)
.maxLookupRequests(arguments.maxLookupRequest)
- .proxyServiceUrl(arguments.proxyServiceURL, arguments.proxyProtocol);
+ .proxyServiceUrl(arguments.proxyServiceURL, arguments.proxyProtocol)
+ .openTelemetry(AutoConfiguredOpenTelemetrySdk.builder()
+ .build().getOpenTelemetrySdk());
if (isNotBlank(arguments.authPluginClassName)) {
clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);