diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 1532b28343c2b..06b41e46636ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -146,6 +146,7 @@ import org.apache.pulsar.common.util.ThreadDumpUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.apache.pulsar.functions.worker.ErrorNotifier; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; private BookKeeperClientFactory bkClientFactory; private Compactor compactor; + private StrategicTwoPhaseCompactor strategicCompactor; private ResourceUsageTransportManager resourceUsageTransportManager; private ResourceGroupService resourceGroupServiceManager; @@ -1473,6 +1475,19 @@ public Compactor getNullableCompactor() { return this.compactor; } + public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException { + return new StrategicTwoPhaseCompactor(this.getConfiguration(), + getClient(), getBookKeeperClient(), + getCompactorExecutor()); + } + + public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException { + if (this.strategicCompactor == null) { + this.strategicCompactor = newStrategicCompactor(); + } + return this.strategicCompactor; + } + protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java index cd1092a26ea04..3225c0ba7bbc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java @@ -66,7 +66,9 @@ public enum ServiceUnitState { Splitting; // the service unit(e.g. bundle) is in the process of splitting. private static Map> validTransitions = Map.of( - Free, Set.of(Owned, Assigned), + // (Free -> Released | Splitting) transitions are required + // when the topic is compacted in the middle of transfer or split. + Free, Set.of(Owned, Assigned, Released, Splitting), Owned, Set.of(Assigned, Splitting, Free), Assigned, Set.of(Owned, Released, Free), Released, Set.of(Owned, Free), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a476be974a30c..38e8afa50f302 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -43,7 +43,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private long totalCleanupCnt = 0; private long totalBrokerCleanupTombstoneCnt = 0; private long totalServiceUnitCleanupTombstoneCnt = 0; - private long totalServiceUnitCleanupErrorCnt = 0; + private AtomicLong totalCleanupErrorCnt = new AtomicLong(); private long totalCleanupScheduledCnt = 0; private long totalCleanupIgnoredCnt = 0; private long totalCleanupCancelledCnt = 0; @@ -175,10 +175,11 @@ public synchronized void start() throws PulsarServerException { } tableview = pulsar.getClient().newTableViewBuilder(schema) .topic(TOPIC) - // TODO: enable CompactionStrategy + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) .create(); - // TODO: schedule listen instead of foreachAndListen - tableview.forEachAndListen((key, value) -> handle(key, value)); + tableview.listen((key, value) -> handle(key, value)); log.debug("Successfully started the channel tableview."); pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); @@ -332,8 +333,6 @@ private void handle(String serviceUnit, ServiceUnitStateData data) { } ServiceUnitState state = data == null ? Free : data.state(); - - // TODO : Add state validation in tableview by the compaction strategy switch (state) { case Owned -> handleOwnEvent(serviceUnit, data); case Assigned -> handleAssignEvent(serviceUnit, data); @@ -599,7 +598,16 @@ private void scheduleCleanup(String broker, long delayInSecs) { .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); totalCleanupScheduledCnt++; return CompletableFuture - .runAsync(() -> doCleanup(broker), delayed); + .runAsync(() -> { + try { + doCleanup(broker); + } catch (Throwable e) { + log.error("Failed to run the cleanup job for the broker {}, " + + "totalCleanupErrorCnt:{}.", + broker, totalCleanupErrorCnt.incrementAndGet(), e); + } + } + , delayed); }); log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", @@ -610,8 +618,8 @@ private void scheduleCleanup(String broker, long delayInSecs) { private void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); for (Map.Entry etr : tableview.entrySet()) { ServiceUnitStateData stateData = etr.getValue(); String serviceUnit = etr.getKey(); @@ -619,14 +627,14 @@ private void doCleanup(String broker) { || StringUtils.equals(broker, stateData.sourceBroker())) { log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e == null) { - serviceUnitTombstoneCnt.incrementAndGet(); - } else { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", - serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + if (e != null) { + log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + + "cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart); } }); + serviceUnitTombstoneCnt++; } } @@ -636,26 +644,22 @@ private void doCleanup(String broker) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { + if (serviceUnitTombstoneCnt > 0) { this.totalCleanupCnt++; - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; this.totalBrokerCleanupTombstoneCnt++; } - if (serviceUnitTombstoneErrorCnt.get() > 0) { - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); - } - double cleanupTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); // TODO: clean load data stores log.info("Completed a cleanup for the inactive broker:{} in {} ms. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate cleanupErrorCnt:{}, metrics:{} ", broker, cleanupTime, serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); cleanupJobs.remove(broker); } @@ -675,8 +679,8 @@ private void monitorOwnerships(List brokers) { long startTime = System.nanoTime(); Set inactiveBrokers = new HashSet<>(); Set activeBrokers = new HashSet<>(brokers); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); long now = System.currentTimeMillis(); for (Map.Entry etr : tableview.entrySet()) { String serviceUnit = etr.getKey(); @@ -690,14 +694,14 @@ private void monitorOwnerships(List brokers) { serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e == null) { - serviceUnitTombstoneCnt.incrementAndGet(); - } else { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", - serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + if (e != null) { + log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + + "cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart); } }); + serviceUnitTombstoneCnt++; } } @@ -711,22 +715,21 @@ private void monitorOwnerships(List brokers) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + if (serviceUnitTombstoneCnt > 0) { + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; } - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); double monitorTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); log.info("Completed the ownership monitor run in {} ms. " + "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate cleanupErrorCnt:{}, metrics:{} ", monitorTime, inactiveBrokers, inactiveBrokers.size(), serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); } @@ -734,13 +737,13 @@ private void monitorOwnerships(List brokers) { private String printCleanupMetrics() { return String.format( "{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, " - + "totalServiceUnitCleanupTombstoneCnt:%d, totalServiceUnitCleanupErrorCnt:%d, " + + "totalServiceUnitCleanupTombstoneCnt:%d, totalCleanupErrorCnt:%d, " + "totalCleanupScheduledCnt%d, totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, " + " activeCleanupJobs:%d}", totalCleanupCnt, totalBrokerCleanupTombstoneCnt, totalServiceUnitCleanupTombstoneCnt, - totalServiceUnitCleanupErrorCnt, + totalCleanupErrorCnt.get(), totalCleanupScheduledCnt, totalCleanupIgnoredCnt, totalCleanupCancelledCnt, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java new file mode 100644 index 0000000000000..2b21f830dda92 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy { + + private final Schema schema; + + private boolean checkBrokers = true; + + public ServiceUnitStateCompactionStrategy() { + schema = Schema.JSON(ServiceUnitStateData.class); + } + + @Override + public Schema getSchema() { + return schema; + } + + @VisibleForTesting + public void checkBrokers(boolean check) { + this.checkBrokers = check; + } + + @Override + public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { + ServiceUnitState prevState = from == null ? Free : from.state(); + ServiceUnitState state = to == null ? Free : to.state(); + if (!ServiceUnitState.isValidTransition(prevState, state)) { + return true; + } + + if (checkBrokers) { + if (prevState == Free && (state == Assigned || state == Owned)) { + // Free -> Assigned || Owned broker check + return StringUtils.isBlank(to.broker()); + } else if (prevState == Owned && state == Assigned) { + // Owned -> Assigned(transfer) broker check + return !StringUtils.equals(from.broker(), to.sourceBroker()) + || StringUtils.isBlank(to.broker()) + || StringUtils.equals(from.broker(), to.broker()); + } else if (prevState == Assigned && state == Released) { + // Assigned -> Released(transfer) broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Released && state == Owned) { + // Released -> Owned(transfer) broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Assigned && state == Owned) { + // Assigned -> Owned broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Owned && state == Splitting) { + // Owned -> Splitting broker check + return !StringUtils.equals(from.broker(), to.broker()); + } + } + + return false; + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dda9c89b726ee..d009d3778f2d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -78,6 +78,8 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; import org.apache.pulsar.broker.service.AbstractReplicator; @@ -152,6 +154,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -203,6 +206,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private CompletableFuture currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); private final CompactedTopic compactedTopic; + // TODO: Create compaction strategy from topic policy when exposing strategic compaction to users. + private static Map strategicCompactionMap = Map.of( + ServiceUnitStateChannelImpl.TOPIC, + new ServiceUnitStateCompactionStrategy()); + private CompletableFuture currentOffload = CompletableFuture.completedFuture( (MessageIdImpl) MessageId.earliest); @@ -1571,6 +1579,11 @@ public void checkCompaction() { } if (backlogEstimate > compactionThreshold) { + if (log.isDebugEnabled()) { + log.debug( + "topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering " + + "compaction", topic, backlogEstimate, compactionThreshold); + } try { triggerCompaction(); } catch (AlreadyRunningException are) { @@ -3000,7 +3013,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { public synchronized void triggerCompaction() throws PulsarServerException, AlreadyRunningException { if (currentCompaction.isDone()) { - currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + + if (strategicCompactionMap.containsKey(topic)) { + currentCompaction = brokerService.pulsar().getStrategicCompactor() + .compact(topic, strategicCompactionMap.get(topic)); + } else { + currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + } } else { throw new AlreadyRunningException("Compaction already in progress"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index bb0850efab4af..9dc4ec649b62b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -385,6 +385,7 @@ private void phaseTwoLoop(String topic, Iterator> reader, promise.completeExceptionally(e); return; } + outstanding.release(MAX_OUTSTANDING); promise.complete(null); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index ad4d0cb2f0b56..a16c2be6612bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS; import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -51,6 +53,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -60,6 +63,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -89,6 +93,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { @Override protected void setup() throws Exception { conf.setAllowAutoTopicCreation(true); + conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10); super.internalSetup(conf); admin.tenants().createTenant("pulsar", createDefaultTenantInfo()); @@ -289,8 +294,6 @@ public void assignmentTest() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - // TODO: check conflict resolution - // assertEquals(assignedAddr1, ownerAddr1); assertEquals(getOwnerRequests1.size(), 0); assertEquals(getOwnerRequests2.size(), 0); } @@ -567,7 +570,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -592,7 +595,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -608,7 +611,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -626,7 +629,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -644,7 +647,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -669,7 +672,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -682,6 +685,62 @@ public void handleBrokerDeletionEventTest() true); } + @Test(priority = 10) + public void conflictAndCompactionTest() throws ExecutionException, InterruptedException, TimeoutException, + IllegalAccessException, PulsarClientException, PulsarServerException { + + var producer = (Producer) FieldUtils.readDeclaredField(channel1, "producer", true); + producer.newMessage().key(bundle).send(); + var owner1 = channel1.getOwnerAsync(bundle); + var owner2 = channel2.getOwnerAsync(bundle); + assertNull(owner1.get()); + assertNull(owner2.get()); + + var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); + assertNotNull(assigned1); + + waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); + waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); + String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS); + assertEquals(lookupServiceAddress1, assignedAddr1); + + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); + assertNotNull(assigned2); + Exception ex = null; + try { + assigned2.join(); + } catch (CompletionException e) { + ex = e; + } + assertNotNull(ex); + assertEquals(TimeoutException.class, ex.getCause().getClass()); + assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get()); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get()); + + var compactor = spy (pulsar1.getStrategicCompactor()); + FieldUtils.writeDeclaredField(pulsar1, "strategicCompactor", compactor, true); + FieldUtils.writeDeclaredField(pulsar2, "strategicCompactor", compactor, true); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(140, TimeUnit.SECONDS) + .untilAsserted(() -> verify(compactor, times(1)) + .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any())); + + var channel3 = new ServiceUnitStateChannelImpl(pulsar1); + channel3.start(); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals( + channel3.getOwnerAsync(bundle).get(), lookupServiceAddress1)); + channel3.close(); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + } + + // TODO: add the channel recovery test when broker registry is added. private static ConcurrentOpenHashMap>> getOwnerRequests( @@ -768,7 +827,7 @@ private static void waitUntilNewState(ServiceUnitStateChannel channel, String ke if (actual == null) { return true; } else { - return actual.state() != ServiceUnitState.Owned; + return actual.state() != Owned; } }); } @@ -784,6 +843,11 @@ private static void cleanTableView(ServiceUnitStateChannel channel, String servi private static long getCleanupMetric(ServiceUnitStateChannel channel, String metric) throws IllegalAccessException { - return (long) FieldUtils.readDeclaredField(channel, metric, true); + Object var = FieldUtils.readDeclaredField(channel, metric, true); + if (var instanceof AtomicLong) { + return ((AtomicLong) var).get(); + } else { + return (long) var; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java new file mode 100644 index 0000000000000..49b55f7660a81 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.testng.Assert.assertTrue; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ServiceUnitStateCompactionStrategyTest { + ServiceUnitStateCompactionStrategy strategy = new ServiceUnitStateCompactionStrategy(); + + ServiceUnitStateData data(ServiceUnitState state) { + return new ServiceUnitStateData(state, "broker"); + } + + ServiceUnitStateData data(ServiceUnitState state, String dst) { + return new ServiceUnitStateData(state, dst, "broker"); + } + ServiceUnitStateData data(ServiceUnitState state, String src, String dst) { + return new ServiceUnitStateData(state, dst, src); + } + + @Test + public void test() throws InterruptedException { + String dst = "dst"; + assertTrue(strategy.shouldKeepLeft(data(Free), data(Free))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigned, ""))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned, ""))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Released))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Owned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), data(Owned, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Released, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), data(Released, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Assigned, dst), data(Released, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, ""))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "src", dst))); + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Assigned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Owned,"dst2"), data(Splitting, dst))); + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Released), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Released, "dst2"), data(Owned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Released, "src1", dst), data(Owned, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Released), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Splitting), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting))); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java index 304d1df29c971..69e6a2d204c0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTest.java @@ -36,8 +36,8 @@ public void testTransitions() { assertFalse(ServiceUnitState.isValidTransition(Free, Free)); assertTrue(ServiceUnitState.isValidTransition(Free, Assigned)); assertTrue(ServiceUnitState.isValidTransition(Free, Owned)); - assertFalse(ServiceUnitState.isValidTransition(Free, Released)); - assertFalse(ServiceUnitState.isValidTransition(Free, Splitting)); + assertTrue(ServiceUnitState.isValidTransition(Free, Released)); + assertTrue(ServiceUnitState.isValidTransition(Free, Splitting)); assertTrue(ServiceUnitState.isValidTransition(Assigned, Free)); assertFalse(ServiceUnitState.isValidTransition(Assigned, Assigned)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java new file mode 100644 index 0000000000000..41eaa640d28db --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; + +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-compaction") +public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest { + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + private Schema schema; + private ServiceUnitStateCompactionStrategy strategy; + + private ServiceUnitState testState0 = Free; + private ServiceUnitState testState1 = Free; + private ServiceUnitState testState2 = Free; + private ServiceUnitState testState3 = Free; + private ServiceUnitState testState4 = Free; + + private static Random RANDOM = new Random(); + + + private ServiceUnitStateData testValue(ServiceUnitState state, String broker) { + if (state == Free) { + return null; + } + return new ServiceUnitStateData(state, broker); + } + + private ServiceUnitStateData testValue0(String broker) { + ServiceUnitState to = nextValidState(testState0); + testState0 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue1(String broker) { + ServiceUnitState to = nextValidState(testState1); + testState1 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue2(String broker) { + ServiceUnitState to = nextValidState(testState2); + testState2 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue3(String broker) { + ServiceUnitState to = nextValidState(testState3); + testState3 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue4(String broker) { + ServiceUnitState to = nextValidState(testState4); + testState4 = to; + return testValue(to, broker); + } + + private ServiceUnitState nextValidState(ServiceUnitState from) { + List candidates = Arrays.stream(ServiceUnitState.values()) + .filter(to -> to != Free && to != Splitting && isValidTransition(from, to)) + .collect(Collectors.toList()); + var state= candidates.get(RANDOM.nextInt(candidates.size())); + return state; + } + + private ServiceUnitState nextInvalidState(ServiceUnitState from) { + List candidates = Arrays.stream(ServiceUnitState.values()) + .filter(to -> !isValidTransition(from, to)) + .collect(Collectors.toList()); + if (candidates.size() == 0) { + return null; + } + return candidates.get(RANDOM.nextInt(candidates.size())); + } + + private List nextStatesToNull(ServiceUnitState from) { + if (from == null) { + return List.of(); + } + return switch (from) { + case Assigned -> List.of(Owned); + case Owned -> List.of(); + case Splitting -> List.of(); + default -> List.of(); + }; + } + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + schema = Schema.JSON(ServiceUnitStateData.class); + strategy = new ServiceUnitStateCompactionStrategy(); + strategy.checkBrokers(false); + + } + + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } + } + + + public record TestData( + String topic, + Map expected, + List> all) { + + } + TestData generateTestData() throws PulsarAdminException, PulsarClientException { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + final int numMessages = 20; + final int maxKeys = 5; + + // Configure retention to ensue data is retained for reader + admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1)); + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + Map expected = new HashMap<>(); + List> all = new ArrayList<>(); + Random r = new Random(0); + + pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub1") + .readCompacted(true) + .subscribe().close(); + + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key" + keyIndex; + ServiceUnitStateData prev = expected.get(key); + ServiceUnitState prevState = prev == null ? Free : prev.state(); + ServiceUnitState state = r.nextBoolean() ? nextInvalidState(prevState) : + nextValidState(prevState); + ServiceUnitStateData value = new ServiceUnitStateData(state, key + ":" + j); + producer.newMessage().key(key).value(value).send(); + if (!strategy.shouldKeepLeft(prev, value)) { + expected.put(key, value); + } + all.add(Pair.of(key, value)); + } + return new TestData(topic, expected, all); + } + + @Test + public void testCompaction() throws Exception { + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var all = testData.all; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false); + // Compacted topic ledger should have same number of entry equals to number of unique key. + //Assert.assertEquals(internalStats.compactedLedger.entries, expected.size()); + Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); + Assert.assertFalse(internalStats.compactedLedger.offloaded); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + } + + // can get full backlog if read compacted disabled + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(false).subscribe()) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Pair expectedMessage = all.remove(0); + Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); + Assert.assertEquals(expectedMessage.getRight(), m.getValue()); + if (all.isEmpty()) { + break; + } + } + Assert.assertTrue(all.isEmpty()); + } + } + + @Test + public void testCompactionWithReader() throws Exception { + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var all = testData.all; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Reader reader = pulsarClient.newReader(schema).topic(topic).readCompacted(true) + .startMessageId(MessageId.earliest).create()) { + while (true) { + Message m = reader.readNext(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + } + + // can get full backlog if read compacted disabled + try (Reader reader = pulsarClient.newReader(schema).topic(topic).readCompacted(false) + .startMessageId(MessageId.earliest).create()) { + while (true) { + Message m = reader.readNext(2, TimeUnit.SECONDS); + Pair expectedMessage = all.remove(0); + Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); + Assert.assertEquals(expectedMessage.getRight(), m.getValue()); + if (all.isEmpty()) { + break; + } + } + Assert.assertTrue(all.isEmpty()); + } + } + + + @Test + public void testCompactionWithTableview() throws Exception { + var tv = pulsar.getClient().newTableViewBuilder(schema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) + .create(); + + ((ServiceUnitStateCompactionStrategy) + FieldUtils.readDeclaredField(tv, "compactionStrategy", true)) + .checkBrokers(false); + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var expectedCopy = new HashMap<>(expected); + + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(expectedCopy.size(), tv.size())); + + for(var etr : tv.entrySet()){ + Assert.assertEquals(expectedCopy.remove(etr.getKey()), etr.getValue()); + if (expectedCopy.isEmpty()) { + break; + } + } + + Assert.assertTrue(expectedCopy.isEmpty()); + tv.close();; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + var tableview = pulsar.getClient().newTableViewBuilder(schema) + .topic(topic) + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) + .create(); + for(var etr : tableview.entrySet()){ + Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + tableview.close(); + + } + + + @Test + public void testReadCompactedBeforeCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content1"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testReadEntriesAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + producer.newMessage().key("key0").value(testValue0("content3")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content3"); + } + } + + @Test + public void testSeekEarliestAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + consumer.seek(MessageId.earliest); + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(false).subscribe()) { + consumer.seek(MessageId.earliest); + + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content1"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testBrokerRestartAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + stopBroker(); + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + consumer.receive(); + Assert.fail("Shouldn't have been able to receive anything"); + } catch (PulsarClientException e) { + // correct behaviour + } + startBroker(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testCompactEmptyTopic() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + } + } + + @Test + public void testWholeBatchCompactedOut() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producerNormal = pulsarClient.newProducer(schema).topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producerBatch = pulsarClient.newProducer(schema).topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { + producerBatch.newMessage().key("key1").value(testValue1("my-message-1")).sendAsync(); + producerBatch.newMessage().key("key1").value(testValue1( "my-message-2")).sendAsync(); + producerBatch.newMessage().key("key1").value(testValue1("my-message-3")).sendAsync(); + producerNormal.newMessage().key("key1").value(testValue1( "my-message-4")).send(); + } + + // compact the topic + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()) { + Message message = consumer.receive(); + Assert.assertEquals(message.getKey(), "key1"); + Assert.assertEquals(new String(message.getValue().broker()), "my-message-4"); + } + } + + public void testCompactionWithLastDeletedKey() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema).topic(topic).enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value(testValue(Owned, "1")).send(); + producer.newMessage().key("2").value(testValue(Owned, "3")).send(); + producer.newMessage().key("3").value(testValue(Owned, "5")).send(); + producer.newMessage().key("1").value(null).send(); + producer.newMessage().key("2").value(null).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + Set expected = Sets.newHashSet("3"); + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertTrue(expected.remove(m.getKey())); + } + } + + @Test(timeOut = 20000) + public void testEmptyCompactionLedger() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema).topic(topic).enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value(testValue(Owned, "1")).send(); + producer.newMessage().key("2").value(testValue(Owned, "3")).send(); + producer.newMessage().key("1").value(null).send(); + producer.newMessage().key("2").value(null).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertNull(m); + } + } + + @Test(timeOut = 20000) + public void testAllEmptyCompactionLedger() throws Exception { + final String topic = + "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString(); + + final int messages = 10; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.batchingMaxMessages(messages / 5); + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key("1").value(null).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertNull(m); + } + } + + @Test(timeOut = 20000) + public void testCompactMultipleTimesWithoutEmptyMessage() + throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = + "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID() + .toString(); + + final int messages = 10; + final String key = "1"; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.enableBatching(true); + + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + ""))).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // 3. Send more ten messages + futures.clear(); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 10 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + + // 4.compact again. + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { + Message m1 = consumer.receive(); + assertNotNull(m1); + assertEquals(m1.getKey(), key); + assertEquals(m1.getValue().broker(), "19"); + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + } + + @Test(timeOut = 200000) + public void testReadUnCompacted() + throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString(); + + final int messages = 10; + final String key = "1"; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.batchingMaxMessages(messages / 5); + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + ""))).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // 3. Send more ten messages + futures.clear(); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 10 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub1") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + for (int i = 0; i < 11; i++) { + Message received = consumer.receive(); + assertNotNull(received); + assertEquals(received.getKey(), key); + assertEquals(received.getValue().broker(), i + 9 + ""); + consumer.acknowledge(received); + } + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + + // 4.Send empty message to delete the key-value in the compacted topic. + for (ServiceUnitState state : nextStatesToNull(testState0)) { + producer.newMessage().key(key).value(new ServiceUnitStateData(state, "xx")).send(); + } + producer.newMessage().key(key).value(null).send(); + + // 5.compact the topic. + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub2") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 20 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub3") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + for (int i = 0; i < 10; i++) { + Message received = consumer.receive(); + assertNotNull(received); + assertEquals(received.getKey(), key); + assertEquals(received.getValue().broker(), i + 20 + ""); + consumer.acknowledge(received); + } + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 21792ef38933b..81771126f76ce 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -188,9 +188,9 @@ private void handleMessage(Message msg) { cur); } - T prev = data.get(key); boolean update = true; if (compactionStrategy != null) { + T prev = data.get(key); update = !compactionStrategy.shouldKeepLeft(prev, cur); }