From 752bfa1cdd472b94ff7d5e87451fcc208e763b89 Mon Sep 17 00:00:00 2001 From: dockerzhang Date: Thu, 10 Dec 2020 16:54:59 +0800 Subject: [PATCH] add kop address cache to prevent query zk each time (#257) improvement for [#256](https://github.com/streamnative/kop/issues/256) --- .../pulsar/handlers/kop/KafkaProtocolHandler.java | 4 ++-- .../pulsar/handlers/kop/KafkaRequestHandler.java | 9 +++++++-- .../pulsar/handlers/kop/KafkaTopicManager.java | 15 ++++++++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 120fd3546d1e2..524cbc6b74145 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -98,7 +98,7 @@ public void onLoad(NamespaceBundle bundle) { if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) { checkState(name.isPartitioned(), "OffsetTopic should be partitioned in onLoad, but get " + name); - KafkaTopicManager.removeLookupCache(name.toString()); + KafkaTopicManager.removeTopicManagerCache(name.toString()); if (log.isDebugEnabled()) { log.debug("New offset partition load: {}, broker: {}", @@ -129,7 +129,7 @@ public void unLoad(NamespaceBundle bundle) { if (Topic.GROUP_METADATA_TOPIC_NAME.equals(getKafkaTopicNameFromPulsarTopicname(name))) { checkState(name.isPartitioned(), "OffsetTopic should be partitioned in unLoad, but get " + name); - KafkaTopicManager.removeLookupCache(name.toString()); + KafkaTopicManager.removeTopicManagerCache(name.toString()); if (log.isDebugEnabled()) { log.debug("Offset partition unload: {}, broker: {}", diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 6434a4c88f521..f0bcd0dcaaf9e 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1224,7 +1224,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("[{}] failed get pulsar address, returned null.", topic.toString()); // getTopicBroker returns null. topic should be removed from LookupCache. - topicManager.removeLookupCache(topic.toString()); + topicManager.removeTopicManagerCache(topic.toString()); returnFuture.complete(Optional.empty()); return returnFuture; @@ -1235,6 +1235,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { topic, pulsarAddress); } + // get kop address from cache to prevent query zk each time. + if (topicManager.KOP_ADDRESS_CACHE.containsKey(topic.toString())) { + return topicManager.KOP_ADDRESS_CACHE.get(topic.toString()); + } // advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort // here we get the broker url, need to find related webServiceUrl. ZooKeeperCache zkCache = pulsarService.getLocalZkCache(); @@ -1290,6 +1294,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } if (lookupDataContainsAddress(data, hostAndPort)) { + topicManager.KOP_ADDRESS_CACHE.put(topic.toString(), returnFuture); returnFuture.complete(data.getProtocol(KafkaProtocolHandler.PROTOCOL_NAME)); return; } @@ -1358,7 +1363,7 @@ private CompletableFuture findBroker(TopicName topic) { // here we found topic broker: broker2, but this is in broker1, // how to clean the lookup cache? if (!localListeners.contains(kopBrokerUrl)) { - topicManager.removeLookupCache(topic.toString()); + topicManager.removeTopicManagerCache(topic.toString()); } if (!topicManager.topicExists(topic.toString()) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 48ba56afcb919..f1040fc31d598 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -17,6 +17,7 @@ import java.net.InetSocketAddress; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -70,6 +71,9 @@ public class KafkaTopicManager { public static final ConcurrentHashMap> LOOKUP_CACHE = new ConcurrentHashMap<>(); + public static final ConcurrentHashMap>> + KOP_ADDRESS_CACHE = new ConcurrentHashMap<>(); + KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) { this.requestHandler = kafkaRequestHandler; this.pulsarService = kafkaRequestHandler.getPulsarService(); @@ -128,8 +132,9 @@ public CompletableFuture getTopicConsumerManager(Stri ); } - public static void removeLookupCache(String topicName) { + public static void removeTopicManagerCache(String topicName) { LOOKUP_CACHE.remove(topicName); + KOP_ADDRESS_CACHE.remove(topicName); } // whether topic exists in cache. @@ -273,7 +278,7 @@ public CompletableFuture getTopic(String topicName) { // get topic broker returns null. topic should be removed from LookupCache. if (ignore == null) { - removeLookupCache(topicName); + removeTopicManagerCache(topicName); } topicCompletableFuture.complete(null); @@ -290,7 +295,7 @@ public CompletableFuture getTopic(String topicName) { log.error("[{}] Failed to getTopic {}. exception:", requestHandler.ctx.channel(), t, throwable); // failed to getTopic from current broker, remove cache, which added in getTopicBroker. - removeLookupCache(t); + removeTopicManagerCache(t); topicCompletableFuture.complete(null); return; } @@ -351,7 +356,7 @@ public synchronized void close() { for (Map.Entry> entry : topics.entrySet()) { String topicName = entry.getKey(); - removeLookupCache(topicName); + removeTopicManagerCache(topicName); CompletableFuture topicFuture = entry.getValue(); if (log.isDebugEnabled()) { log.debug("[{}] remove producer {} for topic {} at close()", @@ -376,7 +381,7 @@ public Producer getReferenceProducer(String topicName) { public void deReference(String topicName) { try { - removeLookupCache(topicName); + removeTopicManagerCache(topicName); if (consumerTopicManagers.containsKey(topicName)) { CompletableFuture manager = consumerTopicManagers.get(topicName);