Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-379: Snapshot hash range assignments only in AUTO_SPLIT ordered mode #23423

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
Expand All @@ -44,21 +45,32 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons

private final int numberOfPoints;
private final Range keyHashRange;
private final boolean addOrRemoveReturnsImpactedConsumersResult;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this(numberOfPoints, DEFAULT_RANGE_SIZE - 1);
this(numberOfPoints, false);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeMaxValue) {
public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints,
boolean addOrRemoveReturnsImpactedConsumersResult) {
this(numberOfPoints, addOrRemoveReturnsImpactedConsumersResult, DEFAULT_RANGE_SIZE - 1);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints,
boolean addOrRemoveReturnsImpactedConsumersResult,
int rangeMaxValue) {
this.addOrRemoveReturnsImpactedConsumersResult = addOrRemoveReturnsImpactedConsumersResult;
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
this.keyHashRange = Range.of(STICKY_KEY_HASH_NOT_SET + 1, rangeMaxValue);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
this.consumerHashAssignmentsSnapshot = addOrRemoveReturnsImpactedConsumersResult
? ConsumerHashAssignmentsSnapshot.empty()
: null;
}

@Override
public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -76,11 +88,14 @@ public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer)
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
return CompletableFuture.completedFuture(Optional.of(impactedConsumers));
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -103,7 +118,7 @@ private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerName
}

@Override
public ImpactedConsumersResult removeConsumer(Consumer consumer) {
public Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -117,11 +132,14 @@ public ImpactedConsumersResult removeConsumer(Consumer consumer) {
}
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return Optional.empty();
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.of(impactedConsumers);
} finally {
rwLock.writeLock().unlock();
}
Expand Down Expand Up @@ -155,7 +173,8 @@ public Range getKeyHashRange() {
public ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
rwLock.readLock().lock();
try {
return consumerHashAssignmentsSnapshot;
return consumerHashAssignmentsSnapshot != null ? consumerHashAssignmentsSnapshot
: internalGetConsumerHashAssignmentsSnapshot();
} finally {
rwLock.readLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
Expand Down Expand Up @@ -59,13 +60,20 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
private final Range keyHashRange;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
private final Map<Consumer, Integer> consumerRange;
private final boolean addOrRemoveReturnsImpactedConsumersResult;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public HashRangeAutoSplitStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
this(false);
}

public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
public HashRangeAutoSplitStickyKeyConsumerSelector(boolean addOrRemoveReturnsImpactedConsumersResult) {
this(DEFAULT_RANGE_SIZE, addOrRemoveReturnsImpactedConsumersResult);
}

public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize,
boolean addOrRemoveReturnsImpactedConsumersResult) {
this.addOrRemoveReturnsImpactedConsumersResult = addOrRemoveReturnsImpactedConsumersResult;
if (rangeSize < 2) {
throw new IllegalArgumentException("range size must greater than 2");
}
Expand All @@ -76,11 +84,12 @@ public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
this.consumerRange = new HashMap<>();
this.rangeSize = rangeSize;
this.keyHashRange = Range.of(0, rangeSize - 1);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
this.consumerHashAssignmentsSnapshot = addOrRemoveReturnsImpactedConsumersResult
? ConsumerHashAssignmentsSnapshot.empty() : null;
}

@Override
public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public synchronized CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
if (rangeMap.isEmpty()) {
rangeMap.put(rangeSize, consumer);
consumerRange.put(consumer, rangeSize);
Expand All @@ -91,15 +100,18 @@ public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consu
return CompletableFuture.failedFuture(e);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return CompletableFuture.completedFuture(Optional.empty());
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
return CompletableFuture.completedFuture(Optional.of(impactedConsumers));
}

@Override
public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
public synchronized Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
Integer removeRange = consumerRange.remove(consumer);
if (removeRange != null) {
if (removeRange == rangeSize && rangeMap.size() > 1) {
Expand All @@ -111,11 +123,14 @@ public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
rangeMap.remove(removeRange);
}
}
if (!addOrRemoveReturnsImpactedConsumersResult) {
return Optional.empty();
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.of(impactedConsumers);
}

@Override
Expand All @@ -134,7 +149,8 @@ public Range getKeyHashRange() {

@Override
public synchronized ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
return consumerHashAssignmentsSnapshot;
return consumerHashAssignmentsSnapshot != null ? consumerHashAssignmentsSnapshot
: internalGetConsumerHashAssignmentsSnapshot();
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.client.api.Range;
Expand All @@ -38,7 +39,6 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
private final int rangeSize;
private final Range keyHashRange;
private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public HashRangeExclusiveStickyKeyConsumerSelector() {
this(DEFAULT_RANGE_SIZE);
Expand All @@ -52,11 +52,10 @@ public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
this.rangeSize = rangeSize;
this.keyHashRange = Range.of(0, rangeSize - 1);
this.rangeMap = new ConcurrentSkipListMap<>();
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
}

@Override
public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
public synchronized CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer) {
return validateKeySharedMeta(consumer).thenApply(__ -> {
try {
return internalAddConsumer(consumer);
Expand All @@ -66,7 +65,7 @@ public synchronized CompletableFuture<ImpactedConsumersResult> addConsumer(Consu
});
}

private synchronized ImpactedConsumersResult internalAddConsumer(Consumer consumer)
private synchronized Optional<ImpactedConsumersResult> internalAddConsumer(Consumer consumer)
throws BrokerServiceException.ConsumerAssignException {
Consumer conflictingConsumer = findConflictingConsumer(consumer.getKeySharedMeta().getHashRangesList());
if (conflictingConsumer != null) {
Expand All @@ -77,29 +76,17 @@ private synchronized ImpactedConsumersResult internalAddConsumer(Consumer consum
rangeMap.put(intRange.getStart(), consumer);
rangeMap.put(intRange.getEnd(), consumer);
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.empty();
}

@Override
public synchronized ImpactedConsumersResult removeConsumer(Consumer consumer) {
public synchronized Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer) {
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
return Optional.empty();
}

@Override
public synchronized ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
return consumerHashAssignmentsSnapshot;
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
List<HashRangeAssignment> result = new ArrayList<>();
Map.Entry<Integer, Consumer> prev = null;
for (Map.Entry<Integer, Consumer> entry: rangeMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Range;

Expand Down Expand Up @@ -50,7 +51,7 @@ public interface StickyKeyConsumerSelector {
* The result contains information about the existing consumers whose hash ranges were affected
* by the addition of the new consumer.
*/
CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer);
CompletableFuture<Optional<ImpactedConsumersResult>> addConsumer(Consumer consumer);

/**
* Remove the consumer.
Expand All @@ -59,7 +60,7 @@ public interface StickyKeyConsumerSelector {
* @return the result of impacted consumers. The result contains information about the existing consumers
* whose hash ranges were affected by the removal of the consumer.
*/
ImpactedConsumersResult removeConsumer(Consumer consumer);
Optional<ImpactedConsumersResult> removeConsumer(Consumer consumer);

/**
* Select a consumer by sticky key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints(), drainingHashesRequired);
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
selector = new HashRangeAutoSplitStickyKeyConsumerSelector(drainingHashesRequired);
}
break;
case STICKY:
Expand Down Expand Up @@ -155,7 +155,7 @@ public void endBatch() {
drainingHashesTracker.endBatch();
}
});
registerDrainingHashes(consumer, impactedConsumers);
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
}
}).exceptionally(ex -> {
internalRemoveConsumer(consumer);
Expand Down Expand Up @@ -184,13 +184,13 @@ private synchronized void registerDrainingHashes(Consumer skipConsumer,
@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
ImpactedConsumersResult impactedConsumers = selector.removeConsumer(consumer);
Optional<ImpactedConsumersResult> impactedConsumers = selector.removeConsumer(consumer);
super.removeConsumer(consumer);
if (drainingHashesRequired) {
// register draining hashes for the impacted consumers and ranges, in case a hash switched from one
// consumer to another. This will handle the case where a hash gets switched from an existing
// consumer to another existing consumer during removal.
registerDrainingHashes(consumer, impactedConsumers);
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {

@Test
public void testShouldContainMinimalMappingChangesWhenConsumerLeavesAndRejoins() {
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
final String consumerName = "consumer";
final int numOfInitialConsumers = 10;
List<Consumer> consumers = new ArrayList<>();
Expand Down Expand Up @@ -563,7 +564,8 @@ public void testPerformanceOfAdding1000ConsumersWith100Points() {
// test that adding 1000 consumers with 100 points runs in a reasonable time.
// This takes about 1 second on Apple M3
// this unit test can be used for basic profiling
final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
final ConsistentHashingStickyKeyConsumerSelector selector =
new ConsistentHashingStickyKeyConsumerSelector(100, true);
for (int i = 0; i < 1000; i++) {
// use real class to avoid Mockito over head
final Consumer consumer = new Consumer("consumer" + i, 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {

@Test
public void testGetConsumerKeyHashRanges() throws BrokerServiceException.ConsumerAssignException {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
HashRangeAutoSplitStickyKeyConsumerSelector selector =
new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5, false);
List<String> consumerName = Arrays.asList("consumer1", "consumer2", "consumer3", "consumer4");
List<Consumer> consumers = new ArrayList<>();
for (String s : consumerName) {
Expand All @@ -61,7 +62,8 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume

@Test
public void testGetConsumerKeyHashRangesWithSameConsumerName() throws Exception {
HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5);
HashRangeAutoSplitStickyKeyConsumerSelector selector =
new HashRangeAutoSplitStickyKeyConsumerSelector(2 << 5, false);
final String consumerName = "My-consumer";
List<Consumer> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Expand Down
Loading