Skip to content

Commit

Permalink
[improve] PIP-342: OTel client metrics support (#22179)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 28, 2024
1 parent d8903da commit 6b29382
Show file tree
Hide file tree
Showing 52 changed files with 1,476 additions and 264 deletions.
4 changes: 4 additions & 0 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ The Apache Software License, Version 2.0
- log4j-core-2.18.0.jar
- log4j-slf4j-impl-2.18.0.jar
- log4j-web-2.18.0.jar
* OpenTelemetry
- opentelemetry-api-1.34.1.jar
- opentelemetry-context-1.34.1.jar
- opentelemetry-extension-incubator-1.34.1-alpha.jar

* BookKeeper
- bookkeeper-common-allocator-4.16.4.jar
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -997,7 +998,8 @@ public void testLookupThrottlingForClientByClient() throws Exception {
// Using an AtomicReference in order to reset a new CountDownLatch
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
latchRef.set(new CountDownLatch(1));
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () -> new ClientCnx(conf, eventLoop) {
try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop,
() -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop) {
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
ExecutorService executor = Executors.newFixedThreadPool(10);
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop)) {
final int totalConsumers = 20;
List<Future<?>> futures = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void testProxyProtocol() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
@Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] bs = "PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes();
ctx.writeAndFlush(Unpooled.copiedBuffer(bs));
Expand All @@ -124,7 +125,7 @@ public void testPubSubWhenSlowNetwork() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
@Cleanup
PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Thread task = new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;

Expand All @@ -42,7 +43,7 @@ public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder,
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory);

// Inject into ClientCnx.
ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup,
ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup,
() -> clientCnxFactory.generate(conf, eventLoopGroup));

return new InjectedClientCnxPulsarClientImpl(conf, eventLoopGroup, pool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand Down Expand Up @@ -137,7 +138,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
LookupService lookupService = useHttp ? new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName = TopicName.get("persistent://public/default/test");
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testHttpLookupRedirect() throws Exception {
conf.setMaxLookupRedirects(10);

@Cleanup
HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors);
HttpLookupService lookupService = new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors);
NamespaceService namespaceService = pulsar.getNamespaceService();

LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -68,7 +69,8 @@ protected void cleanup() throws Exception {
public void testSingleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);

Expand Down Expand Up @@ -118,7 +120,7 @@ public void testSelectConnectionForSameProducer() throws Exception {
public void testDoubleIpAddress() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);

Expand All @@ -143,7 +145,8 @@ public void testNoConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(0);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
Expand All @@ -166,7 +169,8 @@ public void testEnableConnectionPool() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(5);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test"));
ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop);
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop);

InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved("127.0.0.1", brokerPort);
Expand Down Expand Up @@ -233,8 +237,10 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws
}
};

ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop,
(Supplier<ClientCnx>) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver));
ConnectionPool pool =
spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop,
(Supplier<ClientCnx>) () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop),
Optional.of(resolver));


ClientCnx cnx = pool.getConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -233,7 +234,7 @@ public void testTamperingMessageIsDetected() throws Exception {
// WHEN
// protocol message is created with checksum
ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
OpSendMsg op = OpSendMsg.create((MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, (MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);

// THEN
// the checksum validation passes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.protocol.ByteBufPair;
Expand Down Expand Up @@ -499,7 +500,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{
ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
MessageImpl msgImpl = ((MessageImpl<byte[]>) msg.getMessage());
msgImpl.setSchemaState(SchemaState.Ready);
OpSendMsg op = OpSendMsg.create(msgImpl, cmd, 1, null);
OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, msgImpl, cmd, 1, null);
producer.processOpSendMsg(op);

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -811,7 +812,7 @@ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned)
private PulsarClient createDelayWatchTopicsClient() throws Exception {
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
return InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) {
(conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList(
BaseCommand command, long requestId) {
// Inject 2 seconds delay when sending command New Watch Topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;

Expand Down Expand Up @@ -79,7 +80,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar
new DefaultThreadFactory("pulsar-test-client-io", Thread.currentThread().isDaemon()));

AtomicReference<Supplier<ClientCnx>> clientCnxSupplierReference = new AtomicReference<>();
ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, eventLoopGroup,
ConnectionPool connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConfigurationData, eventLoopGroup,
() -> clientCnxSupplierReference.get().get());

return new PulsarTestClient(clientConfigurationData, eventLoopGroup, connectionPool,
Expand All @@ -101,7 +102,7 @@ private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopG
* @return new ClientCnx instance
*/
protected ClientCnx createClientCnx() {
return new ClientCnx(conf, eventLoopGroup) {
return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
@Override
public int getRemoteEndpointProtocolVersion() {
return overrideRemoteEndpointProtocolVersion != 0
Expand Down
Loading

0 comments on commit 6b29382

Please sign in to comment.