diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 4bde8e90cfee3..41236093658de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -146,6 +146,7 @@ import org.apache.pulsar.common.util.ThreadDumpUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.apache.pulsar.functions.worker.ErrorNotifier; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; private BookKeeperClientFactory bkClientFactory; private Compactor compactor; + private StrategicTwoPhaseCompactor strategicCompactor; private ResourceUsageTransportManager resourceUsageTransportManager; private ResourceGroupService resourceGroupServiceManager; @@ -1455,6 +1457,19 @@ public Compactor getNullableCompactor() { return this.compactor; } + public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException { + return new StrategicTwoPhaseCompactor(this.getConfiguration(), + getClient(), getBookKeeperClient(), + getCompactorExecutor()); + } + + public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException { + if (this.strategicCompactor == null) { + this.strategicCompactor = newStrategicCompactor(); + } + return this.strategicCompactor; + } + protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index a476be974a30c..ab978afe340fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -175,10 +175,11 @@ public synchronized void start() throws PulsarServerException { } tableview = pulsar.getClient().newTableViewBuilder(schema) .topic(TOPIC) - // TODO: enable CompactionStrategy + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) .create(); - // TODO: schedule listen instead of foreachAndListen - tableview.forEachAndListen((key, value) -> handle(key, value)); + tableview.listen((key, value) -> handle(key, value)); log.debug("Successfully started the channel tableview."); pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); @@ -332,8 +333,6 @@ private void handle(String serviceUnit, ServiceUnitStateData data) { } ServiceUnitState state = data == null ? Free : data.state(); - - // TODO : Add state validation in tableview by the compaction strategy switch (state) { case Owned -> handleOwnEvent(serviceUnit, data); case Assigned -> handleAssignEvent(serviceUnit, data); @@ -619,14 +618,13 @@ private void doCleanup(String broker) { || StringUtils.equals(broker, stateData.sourceBroker())) { log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e == null) { - serviceUnitTombstoneCnt.incrementAndGet(); - } else { + if (e != null) { log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); serviceUnitTombstoneErrorCnt.incrementAndGet(); } }); + serviceUnitTombstoneCnt.incrementAndGet(); } } @@ -690,14 +688,13 @@ private void monitorOwnerships(List brokers) { serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { - if (e == null) { - serviceUnitTombstoneCnt.incrementAndGet(); - } else { + if (e != null) { log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); serviceUnitTombstoneErrorCnt.incrementAndGet(); } }); + serviceUnitTombstoneCnt.incrementAndGet(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java new file mode 100644 index 0000000000000..41e234be8aef0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategy.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy { + + private final Schema schema; + + private boolean checkBrokers = true; + + public ServiceUnitStateCompactionStrategy() { + schema = JSONSchema.of(ServiceUnitStateData.class); + } + + @Override + public Schema getSchema() { + return schema; + } + + public void checkBrokers(boolean check) { + this.checkBrokers = check; + } + + @Override + public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) { + ServiceUnitState prevState = from == null ? Free : from.state(); + ServiceUnitState state = to == null ? Free : to.state(); + if (!ServiceUnitState.isValidTransition(prevState, state)) { + return true; + } + + if (checkBrokers) { + if (prevState == Free && (state == Assigned || state == Owned)) { + // Free -> Assigned || Owned broker check + return StringUtils.isBlank(to.broker()); + } else if (prevState == Owned && state == Assigned) { + // Owned -> Assigned(transfer) broker check + return !StringUtils.equals(from.broker(), to.sourceBroker()) + || StringUtils.isBlank(to.broker()) + || StringUtils.equals(from.broker(), to.broker()); + } else if (prevState == Assigned && state == Released) { + // Assigned -> Released(transfer) broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Released && state == Owned) { + // Released -> Owned(transfer) broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Assigned && state == Owned) { + // Assigned -> Owned broker check + return !StringUtils.equals(from.broker(), to.broker()) + || !StringUtils.equals(from.sourceBroker(), to.sourceBroker()); + } else if (prevState == Owned && state == Splitting) { + // Owned -> Splitting broker check + return !StringUtils.equals(from.broker(), to.broker()); + } + } + + return false; + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2066ac2ee2988..3ac42ecd523ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -77,6 +77,8 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; import org.apache.pulsar.broker.service.AbstractReplicator; @@ -151,6 +153,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -202,6 +205,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private CompletableFuture currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); private final CompactedTopic compactedTopic; + // TODO: Create compaction strategy from topic policy when exposing strategic compaction to users. + private static Map strategicCompactionMap = Map.of( + ServiceUnitStateChannelImpl.TOPIC, + new ServiceUnitStateCompactionStrategy()); + private CompletableFuture currentOffload = CompletableFuture.completedFuture( (MessageIdImpl) MessageId.earliest); @@ -1563,6 +1571,8 @@ public void checkCompaction() { } if (backlogEstimate > compactionThreshold) { + log.info("topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering compaction", + topic, backlogEstimate, compactionThreshold); try { triggerCompaction(); } catch (AlreadyRunningException are) { @@ -2981,7 +2991,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { public synchronized void triggerCompaction() throws PulsarServerException, AlreadyRunningException { if (currentCompaction.isDone()) { - currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + + if (strategicCompactionMap.containsKey(topic)) { + currentCompaction = brokerService.pulsar().getStrategicCompactor() + .compact(topic, strategicCompactionMap.get(topic)); + } else { + currentCompaction = brokerService.pulsar().getCompactor().compact(topic); + } } else { throw new AlreadyRunningException("Compaction already in progress"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index bb0850efab4af..9dc4ec649b62b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -385,6 +385,7 @@ private void phaseTwoLoop(String topic, Iterator> reader, promise.completeExceptionally(e); return; } + outstanding.release(MAX_OUTSTANDING); promise.complete(null); return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index ad4d0cb2f0b56..84b53bd43a9bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.channel; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MAX_CLEAN_UP_DELAY_TIME_IN_SECS; import static org.apache.pulsar.metadata.api.extended.SessionEvent.ConnectionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected; @@ -44,6 +45,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -60,6 +62,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.Split; import org.apache.pulsar.broker.loadbalance.extensions.models.Unload; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -89,6 +92,7 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { @Override protected void setup() throws Exception { conf.setAllowAutoTopicCreation(true); + conf.setBrokerServiceCompactionMonitorIntervalInSeconds(10); super.internalSetup(conf); admin.tenants().createTenant("pulsar", createDefaultTenantInfo()); @@ -289,8 +293,6 @@ public void assignmentTest() var ownerAddr2 = channel2.getOwnerAsync(bundle).get(); assertEquals(ownerAddr1, ownerAddr2); - // TODO: check conflict resolution - // assertEquals(assignedAddr1, ownerAddr1); assertEquals(getOwnerRequests1.size(), 0); assertEquals(getOwnerRequests2.size(), 0); } @@ -682,6 +684,62 @@ public void handleBrokerDeletionEventTest() true); } + @Test(priority = 10) + public void conflictAndCompactionTest() throws ExecutionException, InterruptedException, TimeoutException, + IllegalAccessException, PulsarClientException, PulsarServerException { + + var producer = (Producer) FieldUtils.readDeclaredField(channel1, "producer", true); + producer.newMessage().key(bundle).send(); + var owner1 = channel1.getOwnerAsync(bundle); + var owner2 = channel2.getOwnerAsync(bundle); + assertNull(owner1.get()); + assertNull(owner2.get()); + + var assigned1 = channel1.publishAssignEventAsync(bundle, lookupServiceAddress1); + assertNotNull(assigned1); + + waitUntilNewOwner(channel1, bundle, lookupServiceAddress1); + waitUntilNewOwner(channel2, bundle, lookupServiceAddress1); + String assignedAddr1 = assigned1.get(5, TimeUnit.SECONDS); + assertEquals(lookupServiceAddress1, assignedAddr1); + + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 3 * 1000, true); + var assigned2 = channel2.publishAssignEventAsync(bundle, lookupServiceAddress2); + assertNotNull(assigned2); + Exception ex = null; + try { + assigned2.join(); + } catch (CompletionException e) { + ex = e; + } + assertNotNull(ex); + assertEquals(TimeoutException.class, ex.getCause().getClass()); + assertEquals(lookupServiceAddress1, channel2.getOwnerAsync(bundle).get()); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get()); + + var compactor = spy (pulsar1.getStrategicCompactor()); + FieldUtils.writeDeclaredField(pulsar1, "strategicCompactor", compactor, true); + FieldUtils.writeDeclaredField(pulsar2, "strategicCompactor", compactor, true); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(140, TimeUnit.SECONDS) + .untilAsserted(() -> verify(compactor, times(1)) + .compact(eq(ServiceUnitStateChannelImpl.TOPIC), any())); + + var channel3 = new ServiceUnitStateChannelImpl(pulsar1); + channel3.start(); + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals( + channel3.getOwnerAsync(bundle).get(), lookupServiceAddress1)); + channel3.close(); + FieldUtils.writeDeclaredField(channel2, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + } + + // TODO: add the channel recovery test when broker registry is added. private static ConcurrentOpenHashMap>> getOwnerRequests( @@ -768,7 +826,7 @@ private static void waitUntilNewState(ServiceUnitStateChannel channel, String ke if (actual == null) { return true; } else { - return actual.state() != ServiceUnitState.Owned; + return actual.state() != Owned; } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java new file mode 100644 index 0000000000000..7cf3e927691f9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateCompactionStrategyTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions.channel; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.testng.Assert.assertTrue; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ServiceUnitStateCompactionStrategyTest { + ServiceUnitStateCompactionStrategy strategy = new ServiceUnitStateCompactionStrategy(); + + ServiceUnitStateData data(ServiceUnitState state) { + return new ServiceUnitStateData(state, "broker"); + } + + ServiceUnitStateData data(ServiceUnitState state, String dst) { + return new ServiceUnitStateData(state, dst, "broker"); + } + ServiceUnitStateData data(ServiceUnitState state, String src, String dst) { + return new ServiceUnitStateData(state, dst, src); + } + + @Test + public void test() throws InterruptedException { + String dst = "dst"; + assertTrue(strategy.shouldKeepLeft(data(Free), data(Free))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Assigned, ""))); + assertFalse(strategy.shouldKeepLeft(data(Free), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Owned, ""))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Free), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Owned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), data(Owned, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Assigned), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "dst2"), data(Released, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned, "src1", dst), data(Released, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Assigned, dst), data(Released, dst))); + assertTrue(strategy.shouldKeepLeft(data(Assigned), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, ""))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Assigned, "src", dst))); + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Assigned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Owned), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Owned,"dst2"), data(Splitting, dst))); + assertFalse(strategy.shouldKeepLeft(data(Owned), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Released), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Released, "dst2"), data(Owned, dst))); + assertTrue(strategy.shouldKeepLeft(data(Released, "src1", dst), data(Owned, "src2", dst))); + assertFalse(strategy.shouldKeepLeft(data(Released), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Released), data(Splitting))); + + assertFalse(strategy.shouldKeepLeft(data(Splitting), data(Free))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Assigned))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Owned))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Released))); + assertTrue(strategy.shouldKeepLeft(data(Splitting), data(Splitting))); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java new file mode 100644 index 0000000000000..6f9708a8ab54a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java @@ -0,0 +1,927 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.SneakyThrows; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.service.persistent.SystemTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; + +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-compaction") +public class ServiceUnitStateCompactionTest extends MockedPulsarServiceBaseTest { + private ScheduledExecutorService compactionScheduler; + private BookKeeper bk; + private JSONSchema schema; + private ServiceUnitStateCompactionStrategy strategy; + + private ServiceUnitState testState0 = Free; + private ServiceUnitState testState1 = Free; + private ServiceUnitState testState2 = Free; + private ServiceUnitState testState3 = Free; + private ServiceUnitState testState4 = Free; + + private static Random RANDOM = new Random(); + + + private ServiceUnitStateData testValue(ServiceUnitState state, String broker) { + if (state == Free) { + return null; + } + return new ServiceUnitStateData(state, broker); + } + + private ServiceUnitStateData testValue0(String broker) { + ServiceUnitState to = nextValidState(testState0); + testState0 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue1(String broker) { + ServiceUnitState to = nextValidState(testState1); + testState1 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue2(String broker) { + ServiceUnitState to = nextValidState(testState2); + testState2 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue3(String broker) { + ServiceUnitState to = nextValidState(testState3); + testState3 = to; + return testValue(to, broker); + } + + private ServiceUnitStateData testValue4(String broker) { + ServiceUnitState to = nextValidState(testState4); + testState4 = to; + return testValue(to, broker); + } + + private ServiceUnitState nextValidState(ServiceUnitState from) { + List candidates = Arrays.stream(ServiceUnitState.values()) + .filter(to -> to != Free && to != Splitting && isValidTransition(from, to)) + .collect(Collectors.toList()); + var state= candidates.get(RANDOM.nextInt(candidates.size())); + System.out.println("test state:" + state); + return state; + } + + private ServiceUnitState nextInvalidState(ServiceUnitState from) { + List candidates = Arrays.stream(ServiceUnitState.values()) + .filter(to -> !isValidTransition(from, to)) + .collect(Collectors.toList()); + if (candidates.size() == 0) { + return null; + } + return candidates.get(RANDOM.nextInt(candidates.size())); + } + + private List nextStatesToNull(ServiceUnitState from) { + if (from == null) { + return List.of(); + } + return switch (from) { + case Assigned -> List.of(Owned); + case Owned -> List.of(); + case Splitting -> List.of(); + default -> List.of(); + }; + } + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("my-property", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + compactionScheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); + bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null); + schema = JSONSchema.of(ServiceUnitStateData.class); + strategy = new ServiceUnitStateCompactionStrategy(); + strategy.checkBrokers(false); + + } + + + @AfterMethod(alwaysRun = true) + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + + if (compactionScheduler != null) { + compactionScheduler.shutdownNow(); + } + } + + + public record TestData( + String topic, + Map expected, + List> all) { + + } + TestData generateTestData() throws PulsarAdminException, PulsarClientException { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + final int numMessages = 20; + final int maxKeys = 5; + + // Configure retention to ensue data is retained for reader + admin.namespaces().setRetention("my-property/use/my-ns", new RetentionPolicies(-1, -1)); + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + Map expected = new HashMap<>(); + List> all = new ArrayList<>(); + Random r = new Random(0); + + pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub1") + .readCompacted(true) + .subscribe().close(); + + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key" + keyIndex; + ServiceUnitStateData expectedData = expected.get(key); + ServiceUnitState state = expectedData == null ? nextValidState(Assigned) : + r.nextInt(2) == 0 ? nextInvalidState(expectedData.state()) : + nextValidState(expectedData.state()); + ServiceUnitStateData value = new ServiceUnitStateData(state, key + ":" + j); + producer.newMessage().key(key).value(value).send(); + ServiceUnitStateData prev = expected.get(key); + if (!strategy.shouldKeepLeft(prev, value)) { + expected.put(key, value); + } + all.add(Pair.of(key, value)); + } + return new TestData(topic, expected, all); + } + + @Test + public void testCompaction() throws Exception { + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var all = testData.all; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false); + // Compacted topic ledger should have same number of entry equals to number of unique key. + //Assert.assertEquals(internalStats.compactedLedger.entries, expected.size()); + Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); + Assert.assertFalse(internalStats.compactedLedger.offloaded); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + } + + // can get full backlog if read compacted disabled + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(false).subscribe()) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Pair expectedMessage = all.remove(0); + Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); + Assert.assertEquals(expectedMessage.getRight(), m.getValue()); + if (all.isEmpty()) { + break; + } + } + Assert.assertTrue(all.isEmpty()); + } + } + + @Test + public void testCompactionWithReader() throws Exception { + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var all = testData.all; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Reader reader = pulsarClient.newReader(schema).topic(topic).readCompacted(true) + .startMessageId(MessageId.earliest).create()) { + while (true) { + Message m = reader.readNext(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + } + + // can get full backlog if read compacted disabled + try (Reader reader = pulsarClient.newReader(schema).topic(topic).readCompacted(false) + .startMessageId(MessageId.earliest).create()) { + while (true) { + Message m = reader.readNext(2, TimeUnit.SECONDS); + Pair expectedMessage = all.remove(0); + Assert.assertEquals(expectedMessage.getLeft(), m.getKey()); + Assert.assertEquals(expectedMessage.getRight(), m.getValue()); + if (all.isEmpty()) { + break; + } + } + Assert.assertTrue(all.isEmpty()); + } + } + + + @Test + public void testCompactionWithTableview() throws Exception { + var tv = pulsar.getClient().newTableViewBuilder(schema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) + .create(); + + ((ServiceUnitStateCompactionStrategy) + FieldUtils.readDeclaredField(tv, "compactionStrategy", true)) + .checkBrokers(false); + TestData testData = generateTestData(); + var topic = testData.topic; + var expected = testData.expected; + var expectedCopy = new HashMap<>(expected); + + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(expectedCopy.size(), tv.size())); + + for(var etr : tv.entrySet()){ + Assert.assertEquals(expectedCopy.remove(etr.getKey()), etr.getValue()); + if (expectedCopy.isEmpty()) { + break; + } + } + + Assert.assertTrue(expectedCopy.isEmpty()); + tv.close();; + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + var tableview = pulsar.getClient().newTableViewBuilder(schema) + .topic(topic) + .loadConf(Map.of( + "topicCompactionStrategyClassName", + ServiceUnitStateCompactionStrategy.class.getName())) + .create(); + for(var etr : tableview.entrySet()){ + Assert.assertEquals(expected.remove(etr.getKey()), etr.getValue()); + if (expected.isEmpty()) { + break; + } + } + Assert.assertTrue(expected.isEmpty()); + tableview.close(); + + } + + + @Test + public void testReadCompactedBeforeCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content1"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testReadEntriesAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + producer.newMessage().key("key0").value(testValue0("content3")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content3"); + } + } + + @Test + public void testSeekEarliestAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + consumer.seek(MessageId.earliest); + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(false).subscribe()) { + consumer.seek(MessageId.earliest); + + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content1"); + + m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testBrokerRestartAfterCompaction() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + producer.newMessage().key("key0").value(testValue0("content1")).send(); + producer.newMessage().key("key0").value(testValue0( "content2")).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + + stopBroker(); + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + consumer.receive(); + Assert.fail("Shouldn't have been able to receive anything"); + } catch (PulsarClientException e) { + // correct behaviour + } + startBroker(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content2"); + } + } + + @Test + public void testCompactEmptyTopic() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema) + .topic(topic) + .enableBatching(true) + .create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + producer.newMessage().key("key0").value(testValue0( "content0")).send(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(); + Assert.assertEquals(m.getKey(), "key0"); + Assert.assertEquals(m.getValue().broker(), "content0"); + } + } + + @Test + public void testWholeBatchCompactedOut() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + // subscribe before sending anything, so that we get all messages + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe().close(); + + try (Producer producerNormal = pulsarClient.newProducer(schema).topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producerBatch = pulsarClient.newProducer(schema).topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { + producerBatch.newMessage().key("key1").value(testValue1("my-message-1")).sendAsync(); + producerBatch.newMessage().key("key1").value(testValue1( "my-message-2")).sendAsync(); + producerBatch.newMessage().key("key1").value(testValue1("my-message-3")).sendAsync(); + producerNormal.newMessage().key("key1").value(testValue1( "my-message-4")).send(); + } + + // compact the topic + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionName("sub1").readCompacted(true).subscribe()) { + Message message = consumer.receive(); + Assert.assertEquals(message.getKey(), "key1"); + Assert.assertEquals(new String(message.getValue().broker()), "my-message-4"); + } + } + + public void testCompactionWithLastDeletedKey() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema).topic(topic).enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value(testValue(Owned, "1")).send(); + producer.newMessage().key("2").value(testValue(Owned, "3")).send(); + producer.newMessage().key("3").value(testValue(Owned, "5")).send(); + producer.newMessage().key("1").value(null).send(); + producer.newMessage().key("2").value(null).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + Set expected = Sets.newHashSet("3"); + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertTrue(expected.remove(m.getKey())); + } + } + + @Test(timeOut = 20000) + public void testEmptyCompactionLedger() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + Producer producer = pulsarClient.newProducer(schema).topic(topic).enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + + pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + producer.newMessage().key("1").value(testValue(Owned, "1")).send(); + producer.newMessage().key("2").value(testValue(Owned, "3")).send(); + producer.newMessage().key("1").value(null).send(); + producer.newMessage().key("2").value(null).send(); + + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertNull(m); + } + } + + @Test(timeOut = 20000) + public void testAllEmptyCompactionLedger() throws Exception { + final String topic = + "persistent://my-property/use/my-ns/testAllEmptyCompactionLedger" + UUID.randomUUID().toString(); + + final int messages = 10; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.batchingMaxMessages(messages / 5); + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key("1").value(null).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + assertNull(m); + } + } + + @Test(timeOut = 20000) + public void testCompactMultipleTimesWithoutEmptyMessage() + throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = + "persistent://my-property/use/my-ns/testCompactMultipleTimesWithoutEmptyMessage" + UUID.randomUUID() + .toString(); + + final int messages = 10; + final String key = "1"; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.enableBatching(true); + + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + ""))).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // 3. Send more ten messages + futures.clear(); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 10 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + + // 4.compact again. + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1") + .readCompacted(true).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe()) { + Message m1 = consumer.receive(); + assertNotNull(m1); + assertEquals(m1.getKey(), key); + assertEquals(m1.getValue().broker(), "19"); + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + } + + @Test(timeOut = 200000) + public void testReadUnCompacted() + throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "persistent://my-property/use/my-ns/testReadUnCompacted" + UUID.randomUUID().toString(); + + final int messages = 10; + final String key = "1"; + + // 1.create producer and publish message to the topic. + ProducerBuilder builder = pulsarClient.newProducer(schema).topic(topic); + builder.batchingMaxMessages(messages / 5); + + Producer producer = builder.create(); + + List> futures = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + ""))).sendAsync()); + } + + FutureUtil.waitForAll(futures).get(); + + // 2.compact the topic. + StrategicTwoPhaseCompactor compactor + = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor.compact(topic, strategy).get(); + + // 3. Send more ten messages + futures.clear(); + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 10 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub1") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + for (int i = 0; i < 11; i++) { + Message received = consumer.receive(); + assertNotNull(received); + assertEquals(received.getKey(), key); + assertEquals(received.getValue().broker(), i + 9 + ""); + consumer.acknowledge(received); + } + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + + // 4.Send empty message to delete the key-value in the compacted topic. + for (ServiceUnitState state : nextStatesToNull(testState0)) { + producer.newMessage().key(key).value(new ServiceUnitStateData(state, "xx")).send(); + } + producer.newMessage().key(key).value(null).send(); + + // 5.compact the topic. + compactor.compact(topic, strategy).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub2") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + + for (int i = 0; i < messages; i++) { + futures.add(producer.newMessage().key(key).value(testValue0((i + 20 + ""))).sendAsync()); + } + FutureUtil.waitForAll(futures).get(); + + try (Consumer consumer = pulsarClient.newConsumer(schema) + .topic(topic) + .subscriptionName("sub3") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + for (int i = 0; i < 10; i++) { + Message received = consumer.receive(); + assertNotNull(received); + assertEquals(received.getKey(), key); + assertEquals(received.getValue().broker(), i + 20 + ""); + consumer.acknowledge(received); + } + Message none = consumer.receive(2, TimeUnit.SECONDS); + assertNull(none); + } + } + + @SneakyThrows + @Test + public void testHealthCheckTopicNotCompacted() { + NamespaceName heartbeatNamespaceV1 = + NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); + String topicV1 = "persistent://" + heartbeatNamespaceV1.toString() + "/healthcheck"; + NamespaceName heartbeatNamespaceV2 = + NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); + String topicV2 = heartbeatNamespaceV2.toString() + "/healthcheck"; + Producer producer1 = pulsarClient.newProducer(schema).topic(topicV1).create(); + Producer producer2 = pulsarClient.newProducer(schema).topic(topicV2).create(); + Optional topicReferenceV1 = pulsar.getBrokerService().getTopic(topicV1, false).join(); + Optional topicReferenceV2 = pulsar.getBrokerService().getTopic(topicV2, false).join(); + assertFalse(((SystemTopic) topicReferenceV1.get()).isCompactionEnabled()); + assertFalse(((SystemTopic) topicReferenceV2.get()).isCompactionEnabled()); + producer1.close(); + producer2.close(); + } + + @Test(timeOut = 60000) + public void testCompactionWithMarker() throws Exception { + String namespace = "my-property/use/my-ns"; + final TopicName dest = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker")); + admin.topics().createNonPartitionedTopic(dest.toString()); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(schema) + .topic(dest.toString()) + .subscriptionName("test-compaction-sub") + .subscriptionType(SubscriptionType.Exclusive) + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Latest) + .subscribe(); + @Cleanup + Producer producer = pulsarClient.newProducer(schema) + .topic(dest.toString()) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.newMessage().value(testValue0(("msg-1"))).send(); + Optional topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join(); + Assert.assertTrue(topic.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + Random random = new Random(); + for (int i = 0; i < 100; i++) { + int rad = random.nextInt(3); + ByteBuf marker; + if (rad == 0) { + marker = Markers.newTxnCommitMarker(-1L, 0, i); + } else if (rad == 1) { + marker = Markers.newTxnAbortMarker(-1L, 0, i); + } else { + marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1"); + } + persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + // + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + // + } + }, null); + marker.release(); + } + producer.newMessage().value(testValue0(("msg-2"))).send(); + admin.topics().triggerCompaction(dest.toString()); + Awaitility.await() + .atMost(50, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId; + Assert.assertNotEquals(ledgerId, -1L); + }); + } +}