diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java index b6f34a44429d9..e8493d41960b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java @@ -19,9 +19,18 @@ package org.apache.pulsar.broker.service; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; -import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.policies.data.SubscriptionStats; @@ -32,10 +41,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.net.InetSocketAddress; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - @Test(groups = "broker") public class EnableProxyProtocolTest extends BrokerTestBase { @@ -46,6 +51,15 @@ protected void setup() throws Exception { super.baseSetup(); } + protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + ClientBuilder clientBuilder = + PulsarClient.builder() + .serviceUrl(url) + .statsInterval(intervalInSecs, TimeUnit.SECONDS); + customizeNewPulsarClientBuilder(clientBuilder); + return createNewPulsarClient(clientBuilder); + } + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { @@ -53,7 +67,7 @@ protected void cleanup() throws Exception { } @Test - public void testSimpleProduceAndConsume() throws PulsarClientException { + public void testSimpleProduceAndConsume() throws Exception { final String namespace = "prop/ns-abc"; final String topicName = "persistent://" + namespace + "/testSimpleProduceAndConsume"; final String subName = "my-subscriber-name"; @@ -76,30 +90,104 @@ public void testSimpleProduceAndConsume() throws PulsarClientException { } Assert.assertEquals(received, messages); + + // cleanup. + org.apache.pulsar.broker.service.Consumer serverConsumer = pulsar.getBrokerService().getTopicReference(topicName) + .get().getSubscription(subName).getConsumers().get(0); + ((ServerCnx) serverConsumer.cnx()).close(); + consumer.close(); + producer.close(); + admin.topics().delete(topicName); } @Test - public void testProxyProtocol() throws PulsarClientException, ExecutionException, InterruptedException, PulsarAdminException { + public void testProxyProtocol() throws Exception { final String namespace = "prop/ns-abc"; final String topicName = "persistent://" + namespace + "/testProxyProtocol"; final String subName = "my-subscriber-name"; - PulsarClientImpl client = (PulsarClientImpl) pulsarClient; - CompletableFuture cnx = client.getCnxPool().getConnection(InetSocketAddress.createUnresolved("localhost", pulsar.getBrokerListenPort().get())); - // Simulate the proxy protcol message - cnx.get().ctx().channel().writeAndFlush(Unpooled.copiedBuffer("PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes())); - pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) - .subscribe(); - org.apache.pulsar.broker.service.Consumer c = pulsar.getBrokerService().getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0); - Awaitility.await().untilAsserted(() -> Assert.assertTrue(c.cnx().hasHAProxyMessage())); + + // Create a client that injected the protocol implementation. + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder, + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { + byte[] bs = "PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes(); + ctx.writeAndFlush(Unpooled.copiedBuffer(bs)); + super.channelActive(ctx); + } + }); + + // Verify the addr can be handled correctly. + testPubAndSub(topicName, subName, "198.51.100.22:35646", protocolClient); + + // cleanup. + admin.topics().delete(topicName); + } + + @Test(timeOut = 10000) + public void testPubSubWhenSlowNetwork() throws Exception { + final String namespace = "prop/ns-abc"; + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final String subName = "my-subscriber-name"; + + // Create a client that injected the protocol implementation. + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder, + (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Thread task = new Thread(() -> { + try { + byte[] bs1 = "PROXY".getBytes(); + byte[] bs2 = " TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes(); + ctx.writeAndFlush(Unpooled.copiedBuffer(bs1)); + Thread.sleep(100); + ctx.writeAndFlush(Unpooled.copiedBuffer(bs2)); + super.channelActive(ctx); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + task.start(); + } + }); + + // Verify the addr can be handled correctly. + testPubAndSub(topicName, subName, "198.51.100.22:35646", protocolClient); + + // cleanup. + admin.topics().delete(topicName); + } + + private void testPubAndSub(String topicName, String subName, String expectedHostAndPort, + PulsarClientImpl pulsarClient) throws Exception { + // Verify: subscribe + org.apache.pulsar.client.api.Consumer clientConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subName).subscribe(); + org.apache.pulsar.broker.service.Consumer serverConsumer = pulsar.getBrokerService() + .getTopicReference(topicName).get().getSubscription(subName).getConsumers().get(0); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(serverConsumer.cnx().hasHAProxyMessage())); TopicStats topicStats = admin.topics().getStats(topicName); Assert.assertEquals(topicStats.getSubscriptions().size(), 1); SubscriptionStats subscriptionStats = topicStats.getSubscriptions().get(subName); Assert.assertEquals(subscriptionStats.getConsumers().size(), 1); - Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(), "198.51.100.22:35646"); + Assert.assertEquals(subscriptionStats.getConsumers().get(0).getAddress(), expectedHostAndPort); + + // Verify: producer register. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + TopicStats topicStats2 = admin.topics().getStats(topicName); + Assert.assertEquals(topicStats2.getPublishers().size(), 1); + Assert.assertEquals(topicStats2.getPublishers().get(0).getAddress(), expectedHostAndPort); + + // Verify: Pub & Sub + producer.send("1"); + Message msg = clientConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(msg.getValue(), "1"); + clientConsumer.acknowledge(msg); - pulsarClient.newProducer().topic(topicName).create(); - topicStats = admin.topics().getStats(topicName); - Assert.assertEquals(topicStats.getPublishers().size(), 1); - Assert.assertEquals(topicStats.getPublishers().get(0).getAddress(), "198.51.100.22:35646"); + // cleanup. + ((ServerCnx) serverConsumer.cnx()).close(); + producer.close(); + clientConsumer.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java new file mode 100644 index 0000000000000..557cb415dbe27 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import io.netty.channel.EventLoopGroup; +import java.util.concurrent.ThreadFactory; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.util.netty.EventLoopUtil; + +public class InjectedClientCnxClientBuilder { + + public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder, + final ClientCnxFactory clientCnxFactory) throws Exception { + ClientConfigurationData conf = clientBuilder.getClientConfigurationData(); + ThreadFactory threadFactory = new ExecutorProvider + .ExtendedThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()); + EventLoopGroup eventLoopGroup = + EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory); + + // Inject into ClientCnx. + ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup, + () -> clientCnxFactory.generate(conf, eventLoopGroup)); + + return new PulsarClientImpl(conf, eventLoopGroup, pool); + } + + public interface ClientCnxFactory { + + ClientCnx generate(ClientConfigurationData conf, EventLoopGroup eventLoopGroup); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java index 96ed750617eb5..6e24cfa8d2e60 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java @@ -19,36 +19,63 @@ package org.apache.pulsar.common.protocol; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.ProtocolDetectionResult; import io.netty.handler.codec.ProtocolDetectionState; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.haproxy.HAProxyProtocolVersion; +import lombok.extern.slf4j.Slf4j; /** * Decoder that added whether a new connection is prefixed with the ProxyProtocol. * More about the ProxyProtocol see: http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt. */ +@Slf4j public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter { public static final String NAME = "optional-proxy-protocol-decoder"; + public static final int MIN_BYTES_SIZE_TO_DETECT_PROTOCOL = 12; + + private CompositeByteBuf cumulatedByteBuf; + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { - ProtocolDetectionResult result = - HAProxyMessageDecoder.detectProtocol((ByteBuf) msg); - // should accumulate data if need more data to detect the protocol + // Combine cumulated buffers. + ByteBuf buf = (ByteBuf) msg; + if (cumulatedByteBuf != null) { + buf = cumulatedByteBuf.addComponent(true, buf); + } + + ProtocolDetectionResult result = HAProxyMessageDecoder.detectProtocol(buf); if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) { + // Accumulate data if need more data to detect the protocol. + if (cumulatedByteBuf == null) { + cumulatedByteBuf = new CompositeByteBuf(ctx.alloc(), false, MIN_BYTES_SIZE_TO_DETECT_PROTOCOL, buf); + } return; } + cumulatedByteBuf = null; if (result.state() == ProtocolDetectionState.DETECTED) { ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder()); - ctx.pipeline().remove(this); } + ctx.pipeline().remove(this); + super.channelRead(ctx, buf); + } else { + super.channelRead(ctx, msg); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + if (cumulatedByteBuf != null) { + log.info("Release cumulated byte buffer when channel inactive."); + cumulatedByteBuf = null; } - super.channelRead(ctx, msg); } }