Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Remove ConcurrentOpenHashMap and ConcurrentOpenHashSet #37

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -1207,8 +1206,7 @@ public void calculateCursorBacklogs(final TopicName topicName,
BookKeeper bk = getBookKeeper().get();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
ConcurrentOpenHashMap.<String, Long>newBuilder().build();
final var ledgerRetryMap = new ConcurrentHashMap<String, Long>();

final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final Position lastLedgerPosition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
Expand Down Expand Up @@ -150,7 +149,7 @@ public class NamespaceService implements AutoCloseable {
public static final String HEARTBEAT_NAMESPACE_FMT_V2 = "pulsar/%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s";

private final ConcurrentOpenHashMap<ClusterDataImpl, PulsarClientImpl> namespaceClients;
private final Map<ClusterDataImpl, PulsarClientImpl> namespaceClients = new ConcurrentHashMap<>();

private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;

Expand Down Expand Up @@ -204,8 +203,6 @@ public NamespaceService(PulsarService pulsar) {
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, this);
this.namespaceClients =
ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
Expand Down Expand Up @@ -461,16 +458,10 @@ public boolean registerNamespace(NamespaceName nsname, boolean ensureOwned) thro
}
}

private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative =
ConcurrentOpenHashMap.<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>>newBuilder()
.build();
private final Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesAuthoritative = new ConcurrentHashMap<>();
private final Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
findingBundlesNotAuthoritative = new ConcurrentHashMap<>();

/**
* Main internal method to lookup and setup ownership of service unit to a broker.
Expand All @@ -485,7 +476,7 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
}

ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
Map<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -101,20 +100,12 @@ public MessageDupUnknownException() {
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
final Map<String, Long> highestSequencedPushed = new ConcurrentHashMap<>();

// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
ConcurrentOpenHashMap.<String, Long>newBuilder()
.expectedItems(16)
.concurrencyLevel(1)
.build();
final Map<String, Long> highestSequencedPersisted = new ConcurrentHashMap<>();

// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
Expand Down Expand Up @@ -434,7 +425,7 @@ public void resetHighestSequenceIdPushed() {
}

highestSequencedPushed.clear();
for (String producer : highestSequencedPersisted.keys()) {
for (String producer : highestSequencedPersisted.keySet()) {
highestSequencedPushed.put(producer, highestSequencedPersisted.get(producer));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

/**
*/
public class ClusterReplicationMetrics {
private final List<Metrics> metricsList;
private final String localCluster;
private final ConcurrentOpenHashMap<String, ReplicationMetrics> metricsMap;
private final Map<String, ReplicationMetrics> metricsMap = new ConcurrentHashMap<>();
public static final String SEPARATOR = "_";
public final boolean metricsEnabled;

public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) {
metricsList = new ArrayList<>();
this.localCluster = localCluster;
metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder()
.build();
this.metricsEnabled = metricsEnabled;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
Expand Down Expand Up @@ -230,9 +229,7 @@ public void testInactiveProducerRemove() throws Exception {
messageDeduplication.purgeInactiveProducers();
assertFalse(inactiveProducers.containsKey(producerName2));
assertFalse(inactiveProducers.containsKey(producerName3));
field = MessageDeduplication.class.getDeclaredField("highestSequencedPushed");
field.setAccessible(true);
ConcurrentOpenHashMap<String, Long> highestSequencedPushed = (ConcurrentOpenHashMap<String, Long>) field.get(messageDeduplication);
final var highestSequencedPushed = messageDeduplication.highestSequencedPushed;

assertEquals((long) highestSequencedPushed.get(producerName1), 2L);
assertFalse(highestSequencedPushed.containsKey(producerName2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -67,7 +68,6 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -88,7 +88,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected Map<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected final int maxReceiverQueueSize;
private volatile int currentReceiverQueueSize;
Expand Down Expand Up @@ -138,8 +138,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.messageListenerExecutor = conf.getMessageListenerExecutor() == null
? (conf.getSubscriptionType() == SubscriptionType.Key_Shared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,8 +206,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

protected volatile boolean paused;

protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
protected Map<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentHashMap<>();
private int pendingChunkedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
private final AtomicBoolean expireChunkMessageTaskScheduled = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -52,15 +54,14 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl<T> extends ProducerBase<T> {

private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);

private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
private final Map<Integer, ProducerImpl<T>> producers = new ConcurrentHashMap<>();
private final MessageRouter routerPolicy;
private final PartitionedTopicProducerStatsRecorderImpl stats;
private TopicMetadata topicMetadata;
Expand All @@ -76,8 +77,6 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture,
Schema<T> schema, ProducerInterceptors interceptors) {
super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.producers =
ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -32,15 +34,14 @@
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;

public abstract class ProducerBase<T> extends HandlerState implements Producer<T> {

protected final CompletableFuture<Producer<T>> producerCreatedFuture;
protected final ProducerConfigurationData conf;
protected final Schema<T> schema;
protected final ProducerInterceptors interceptors;
protected final ConcurrentOpenHashMap<SchemaHash, byte[]> schemaCache;
protected final Map<SchemaHash, byte[]> schemaCache = new ConcurrentHashMap<>();
protected volatile MultiSchemaMode multiSchemaMode = MultiSchemaMode.Auto;

protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
Expand All @@ -50,8 +51,6 @@ protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurat
this.conf = conf;
this.schema = schema;
this.interceptors = interceptors;
this.schemaCache =
ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build();
if (!conf.isMultiSchema()) {
multiSchemaMode = MultiSchemaMode.Disabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -62,8 +61,7 @@ public class AcknowledgementsGroupingTrackerTest {
public void setup() throws NoSuchFieldException, IllegalAccessException {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
consumer.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(), eventLoopGroup));
PulsarClientImpl client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -113,8 +111,7 @@ public void testTrackChunkedMessageId() {
ChunkMessageIdImpl chunkedMessageId =
new ChunkMessageIdImpl(chunkMsgIds[0], chunkMsgIds[chunkMsgIds.length - 1]);

consumer.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, chunkMsgIds);

// Redeliver chunked message
Expand Down
Loading
Loading