Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] PIP-342: OTel client metrics support #22179

Merged
merged 26 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
babad52
WIP: PIP-342: OTel client metrics support
merlimat Feb 29, 2024
8e184be
Addressed comments
merlimat Mar 4, 2024
4d0120b
Removed MetricsCardinality from API
merlimat Mar 5, 2024
252196e
Fixed tests code
merlimat Mar 5, 2024
4b31eb1
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 5, 2024
445231c
Ensure single-initialization of InstrumentProvider
merlimat Mar 5, 2024
e840d06
Added unit tests
merlimat Mar 6, 2024
6cc759a
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 6, 2024
a0cb78d
Addressed more comments
merlimat Mar 8, 2024
5af2018
Share the histogram objects
merlimat Mar 9, 2024
218f57b
Added javadoc and reset old metrics default
merlimat Mar 11, 2024
bd1fb3f
Added missing license header
merlimat Mar 11, 2024
59bf64b
Removed trailing spaces
merlimat Mar 11, 2024
857c0a3
Fixed checkstyle
merlimat Mar 11, 2024
e103369
Checkstyle
merlimat Mar 11, 2024
1f2d0c8
Checkstyle
merlimat Mar 11, 2024
413360c
Fixed spotbugs
merlimat Mar 11, 2024
345f240
Fixed compilation
merlimat Mar 11, 2024
0b60e7d
Fix
merlimat Mar 11, 2024
70dd347
Fixed test
merlimat Mar 12, 2024
8fe109f
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 13, 2024
d90f621
Fixed tests mock
merlimat Mar 14, 2024
22d46af
Fixed test
merlimat Mar 14, 2024
a9f3857
Attach subscription attribute to consumer metrics
merlimat Mar 14, 2024
980bae3
More test fix
merlimat Mar 14, 2024
545a7d3
Merge remote-tracking branch 'apache/master' into client-otel
merlimat Mar 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
Expand All @@ -44,6 +45,7 @@
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -137,7 +139,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
LookupService lookupService = useHttp ? new HttpLookupService(new InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors) :
merlimat marked this conversation as resolved.
Show resolved Hide resolved
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName = TopicName.get("persistent://public/default/test");
Expand Down Expand Up @@ -172,7 +174,7 @@ public void testHttpLookupRedirect() throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors);
HttpLookupService lookupService = new HttpLookupService(new InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors);
NamespaceService namespaceService = pulsar.getNamespaceService();

LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null,
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<artifactId>protobuf-java</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Clock;
Expand Down Expand Up @@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

/**
* Set the interval between each stat info <i>(default: 60 seconds)</i> Stats will be activated with positive
* Set the interval between each stat info <i>(default: disabled)</i> Stats will be activated with positive
merlimat marked this conversation as resolved.
Show resolved Hide resolved
* statsInterval It should be set to at least 1 second.
*
* @param statsInterval
* the interval between each stat info
* @param unit
* time unit for {@code statsInterval}
* @return the client builder instance
*
* @deprecated @see {@link #openTelemetry(OpenTelemetry)}
*/
@Deprecated
ClientBuilder statsInterval(long statsInterval, TimeUnit unit);

/**
Expand Down Expand Up @@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);

ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry);
merlimat marked this conversation as resolved.
Show resolved Hide resolved
merlimat marked this conversation as resolved.
Show resolved Hide resolved

ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality);
merlimat marked this conversation as resolved.
Show resolved Hide resolved

/**
* The clock used by the pulsar client.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
Expand All @@ -29,9 +30,12 @@
*
* <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
*
* @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats
asafm marked this conversation as resolved.
Show resolved Hide resolved
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@InterfaceStability.Evolving
@Deprecated
public interface ConsumerStats extends Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.api;

public enum MetricsCardinality {
/**
* Do not add additional labels to metrics
*/
None,

/**
* Label metrics by tenant
*/
Tenant,

/**
* Label metrics by tenant and namespace
*/
Namespace,

/**
* Label metrics by topic
*/
Topic,

/**
* Label metrics by each partition
*/
Partition,
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
Expand All @@ -29,9 +30,12 @@
*
* <p>All the stats are relative to the last recording period. The interval of the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute.
*
* @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@InterfaceStability.Evolving
@Deprecated
public interface ProducerStats extends Serializable {
/**
* @return the number of messages published in the last interval
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<classifier>pkg</classifier>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<!-- this module only need by the real client, client inside broker no need this module -->
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
// messageContainers before publishing this one-batch message.
op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), firstCallback,
op = OpSendMsg.create(producer, messages, cmd, messageMetadata.getSequenceId(), firstCallback,
batchAllocatedSizeBytes);

// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
Expand Down Expand Up @@ -314,7 +314,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
}

OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
OpSendMsg op = OpSendMsg.create(producer, messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);

op.setNumMessagesInBatch(numMessagesInBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static java.lang.String.format;
import io.netty.buffer.ByteBuf;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
Expand All @@ -34,6 +36,7 @@
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
Expand Down Expand Up @@ -63,6 +66,11 @@ public class BinaryProtoLookupService implements LookupService {
private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
partitionedMetadataInProgress = new ConcurrentHashMap<>();

private final LatencyHistogram histoGetBroker;
private final LatencyHistogram histoGetTopicMetadata;
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
Expand All @@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);

Attributes attrs = Attributes.of(AttributeKey.stringKey("transport-type"), "binary");

histoGetBroker = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
merlimat marked this conversation as resolved.
Show resolved Hide resolved
asafm marked this conversation as resolved.
Show resolved Hide resolved
merlimat marked this conversation as resolved.
Show resolved Hide resolved
"Lookup operations",
attrs.toBuilder().put("type", "topic").build());
asafm marked this conversation as resolved.
Show resolved Hide resolved
histoGetTopicMetadata = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
asafm marked this conversation as resolved.
Show resolved Hide resolved
attrs.toBuilder().put("type", "metadata").build());
merlimat marked this conversation as resolved.
Show resolved Hide resolved
histoGetSchema = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "schema").build());
histoListTopics = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
"Lookup operations",
attrs.toBuilder().put("type", "list-topics").build());
}

@Override
Expand All @@ -99,12 +122,20 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
long startTime = System.nanoTime();
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
newFutureCreated.setValue(newFuture);

newFuture.thenRun(() -> {
histoGetBroker.recordSuccess(System.nanoTime() - startTime);
}).exceptionally(x -> {
histoGetBroker.recordFailure(System.nanoTime() - startTime);
return null;
});
return newFuture;
});
} finally {
Expand All @@ -121,12 +152,19 @@ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
*
*/
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
long startTime = System.nanoTime();
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
newFutureCreated.setValue(newFuture);
newFuture.thenRun(() -> {
histoGetBroker.recordSuccess(System.nanoTime() - startTime);
merlimat marked this conversation as resolved.
Show resolved Hide resolved
}).exceptionally(x -> {
histoGetBroker.recordFailure(System.nanoTime() - startTime);
return null;
});
return newFuture;
});
} finally {
Expand Down Expand Up @@ -224,18 +262,21 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
TopicName topicName) {

long startTime = System.nanoTime();
CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<>();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime);
log.warn("[{}] failed to get Partitioned metadata : {}", topicName,
t.getMessage(), t);
partitionFuture.completeExceptionally(t);
} else {
try {
histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime);
partitionFuture.complete(new PartitionedTopicMetadata(r.partitions));
} catch (Exception e) {
partitionFuture.completeExceptionally(new PulsarClientException.LookupException(
Expand Down Expand Up @@ -263,6 +304,7 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {

@Override
public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) {
long startTime = System.nanoTime();
CompletableFuture<Optional<SchemaInfo>> schemaFuture = new CompletableFuture<>();
if (version != null && version.length == 0) {
schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version"));
Expand All @@ -275,10 +317,12 @@ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, by
Optional.ofNullable(BytesSchemaVersion.of(version)));
clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> {
if (t != null) {
histoGetSchema.recordFailure(System.nanoTime() - startTime);
log.warn("[{}] failed to get schema : {}", topicName,
t.getMessage(), t);
schemaFuture.completeExceptionally(t);
} else {
histoGetSchema.recordSuccess(System.nanoTime() - startTime);
schemaFuture.complete(r);
}
client.getCnxPool().releaseConnection(clientCnx);
Expand Down Expand Up @@ -326,15 +370,19 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
Mode mode,
String topicsPattern,
String topicsHash) {
long startTime = System.nanoTime();

client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
namespace.toString(), requestId, mode, topicsPattern, topicsHash);

clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> {
if (t != null) {
histoListTopics.recordFailure(System.nanoTime() - startTime);
getTopicsResultFuture.completeExceptionally(t);
} else {
histoListTopics.recordSuccess(System.nanoTime() - startTime);
if (log.isDebugEnabled()) {
log.debug("[namespace: {}] Success get topics list in request: {}",
namespace, requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import io.opentelemetry.api.OpenTelemetry;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.List;
Expand All @@ -29,6 +30,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MetricsCardinality;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -121,6 +123,18 @@ public ClientBuilder authentication(Authentication authentication) {
return this;
}

@Override
public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) {
conf.setOpenTelemetry(openTelemetry);
merlimat marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

@Override
public ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality metricsCardinality) {
conf.setOpenTelemetryMetricsCardinality(metricsCardinality);
return this;
}

@Override
public ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
Expand Down
Loading
Loading