Skip to content

Commit

Permalink
[improve][broker] Added ServiceUnitStateCompactionStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Dec 24, 2022
1 parent d8569cd commit 3e7c2bf
Show file tree
Hide file tree
Showing 8 changed files with 1,207 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
Expand Down Expand Up @@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
private Compactor compactor;
private StrategicTwoPhaseCompactor strategicCompactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
private ResourceGroupService resourceGroupServiceManager;

Expand Down Expand Up @@ -1455,6 +1457,19 @@ public Compactor getNullableCompactor() {
return this.compactor;
}

public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
return new StrategicTwoPhaseCompactor(this.getConfiguration(),
getClient(), getBookKeeperClient(),
getCompactorExecutor());
}

public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException {
if (this.strategicCompactor == null) {
this.strategicCompactor = newStrategicCompactor();
}
return this.strategicCompactor;
}

protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ public synchronized void start() throws PulsarServerException {
}
tableview = pulsar.getClient().newTableViewBuilder(schema)
.topic(TOPIC)
// TODO: enable CompactionStrategy
.loadConf(Map.of(
"topicCompactionStrategyClassName",
ServiceUnitStateCompactionStrategy.class.getName()))
.create();
// TODO: schedule listen instead of foreachAndListen
tableview.forEachAndListen((key, value) -> handle(key, value));
tableview.listen((key, value) -> handle(key, value));
log.debug("Successfully started the channel tableview.");

pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
Expand Down Expand Up @@ -332,8 +333,6 @@ private void handle(String serviceUnit, ServiceUnitStateData data) {
}

ServiceUnitState state = data == null ? Free : data.state();

// TODO : Add state validation in tableview by the compaction strategy
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
case Assigned -> handleAssignEvent(serviceUnit, data);
Expand Down Expand Up @@ -619,14 +618,13 @@ private void doCleanup(String broker) {
|| StringUtils.equals(broker, stateData.sourceBroker())) {
log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e == null) {
serviceUnitTombstoneCnt.incrementAndGet();
} else {
if (e != null) {
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
serviceUnitTombstoneErrorCnt.incrementAndGet();
}
});
serviceUnitTombstoneCnt.incrementAndGet();
}
}

Expand Down Expand Up @@ -690,14 +688,13 @@ private void monitorOwnerships(List<String> brokers) {
serviceUnit, stateData);

tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e == null) {
serviceUnitTombstoneCnt.incrementAndGet();
} else {
if (e != null) {
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
serviceUnitTombstoneErrorCnt.incrementAndGet();
}
});
serviceUnitTombstoneCnt.incrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;

public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy<ServiceUnitStateData> {

private final Schema<ServiceUnitStateData> schema;

private boolean checkBrokers = true;

public ServiceUnitStateCompactionStrategy() {
schema = JSONSchema.of(ServiceUnitStateData.class);
}

@Override
public Schema<ServiceUnitStateData> getSchema() {
return schema;
}

public void checkBrokers(boolean check) {
this.checkBrokers = check;
}

@Override
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
ServiceUnitState prevState = from == null ? Free : from.state();
ServiceUnitState state = to == null ? Free : to.state();
if (!ServiceUnitState.isValidTransition(prevState, state)) {
return true;
}

if (checkBrokers) {
if (prevState == Free && (state == Assigned || state == Owned)) {
// Free -> Assigned || Owned broker check
return StringUtils.isBlank(to.broker());
} else if (prevState == Owned && state == Assigned) {
// Owned -> Assigned(transfer) broker check
return !StringUtils.equals(from.broker(), to.sourceBroker())
|| StringUtils.isBlank(to.broker())
|| StringUtils.equals(from.broker(), to.broker());
} else if (prevState == Assigned && state == Released) {
// Assigned -> Released(transfer) broker check
return !StringUtils.equals(from.broker(), to.broker())
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
} else if (prevState == Released && state == Owned) {
// Released -> Owned(transfer) broker check
return !StringUtils.equals(from.broker(), to.broker())
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
} else if (prevState == Assigned && state == Owned) {
// Assigned -> Owned broker check
return !StringUtils.equals(from.broker(), to.broker())
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
} else if (prevState == Owned && state == Splitting) {
// Owned -> Splitting broker check
return !StringUtils.equals(from.broker(), to.broker());
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
Expand Down Expand Up @@ -151,6 +153,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -202,6 +205,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;

// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
private static Map<String, TopicCompactionStrategy> strategicCompactionMap = Map.of(
ServiceUnitStateChannelImpl.TOPIC,
new ServiceUnitStateCompactionStrategy());

private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
(MessageIdImpl) MessageId.earliest);

Expand Down Expand Up @@ -1563,6 +1571,8 @@ public void checkCompaction() {
}

if (backlogEstimate > compactionThreshold) {
log.info("topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering compaction",
topic, backlogEstimate, compactionThreshold);
try {
triggerCompaction();
} catch (AlreadyRunningException are) {
Expand Down Expand Up @@ -2981,7 +2991,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);

if (strategicCompactionMap.containsKey(topic)) {
currentCompaction = brokerService.pulsar().getStrategicCompactor()
.compact(topic, strategicCompactionMap.get(topic));
} else {
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
}
} else {
throw new AlreadyRunningException("Compaction already in progress");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ private <T> void phaseTwoLoop(String topic, Iterator<Message<T>> reader,
promise.completeExceptionally(e);
return;
}
outstanding.release(MAX_OUTSTANDING);
promise.complete(null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
Expand All @@ -44,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -60,6 +62,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -89,6 +92,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
@Override
protected void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10);
super.internalSetup(conf);

admin.tenants().createTenant("pulsar", createDefaultTenantInfo());
Expand Down Expand Up @@ -289,8 +293,6 @@ public void assignmentTest()
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();

assertEquals(ownerAddr1, ownerAddr2);
// TODO: check conflict resolution
// assertEquals(assignedAddr1, ownerAddr1);
assertEquals(getOwnerRequests1.size(), 0);
assertEquals(getOwnerRequests2.size(), 0);
}
Expand Down Expand Up @@ -682,6 +684,62 @@ public void handleBrokerDeletionEventTest()
true);
}

@Test(priority = 10)
public void conflictAndCompactionTest() throws ExecutionException, InterruptedException, TimeoutException,
IllegalAccessException, PulsarClientException, PulsarServerException {

var producer = (Producer<ServiceUnitStateData>) FieldUtils.readDeclaredField(channel1, "producer", true);
producer.newMessage().key(bundle).send();
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
assertNull(owner1.get());
assertNull(owner2.get());

var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
assertNotNull(assigned1);

waitUntilNewOwner(channel1, bundle, lookupServiceAddress1);
waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS);
assertEquals(lookupServiceAddress1, assignedAddr1);

FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 3 * 1000, true);
var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2);
assertNotNull(assigned2);
Exception ex = null;
try {
assigned2.join();
} catch (CompletionException e) {
ex = e;
}
assertNotNull(ex);
assertEquals(TimeoutException.class, ex.getCause().getClass());
assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get());
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get());

var compactor = spy (pulsar1.getStrategicCompactor());
FieldUtils.writeDeclaredField(pulsar1, "strategicCompactor", compactor, true);
FieldUtils.writeDeclaredField(pulsar2, "strategicCompactor", compactor, true);
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(140, TimeUnit.SECONDS)
.untilAsserted(() -> verify(compactor, times(1))
.compact(eq(ServiceUnitStateChannelImpl.TOPIC), any()));

var channel3 = new ServiceUnitStateChannelImpl(pulsar1);
channel3.start();
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(
channel3.getOwnerAsync(bundle).get(), lookupServiceAddress1));
channel3.close();
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
}


// TODO: add the channel recovery test when broker registry is added.

private static ConcurrentOpenHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
Expand Down Expand Up @@ -768,7 +826,7 @@ private static void waitUntilNewState(ServiceUnitStateChannel channel, String ke
if (actual == null) {
return true;
} else {
return actual.state() != ServiceUnitState.Owned;
return actual.state() != Owned;
}
});
}
Expand Down
Loading

0 comments on commit 3e7c2bf

Please sign in to comment.