encryptionKeys;
+ CryptoKeyReader cryptoKeyReader;
+ public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
+ super();
+ compressionType = CompressionType.NONE;
+ compressor = new CompressionCodecNone();
+ if (maxNumMessagesInBatch > 0) {
+ this.maxNumMessagesInBatch = maxNumMessagesInBatch;
+ }
+ }
+ private ByteBuf encrypt(ByteBuf compressedPayload) {
+ if (msgCrypto == null) {
+ return compressedPayload;
+ }
+ int maxSize = msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
+ ByteBuf encryptedPayload = allocator.buffer(maxSize);
+ ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);
+
+ try {
+ msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
+ compressedPayload.nioBuffer(), targetBuffer);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Failed to encrypt payload", e);
+ }
+ encryptedPayload.writerIndex(targetBuffer.remaining());
+ compressedPayload.release();
+ return encryptedPayload;
+ }
+
+ @Override
+ public ProducerImpl.OpSendMsg createOpSendMsg() {
+ throw new UnsupportedOperationException();
+ }
+ public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+ this.cryptoKeyReader = cryptoKeyReader;
+ }
+
+ public ByteBuf toByteBuf() {
+ if (numMessagesInBatch > 1) {
+ messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
+ messageMetadata.setSequenceId(lowestSequenceId);
+ messageMetadata.setHighestSequenceId(highestSequenceId);
+ }
+ MessageImpl lastMessage = messages.get(messages.size() - 1);
+ MessageIdImpl lastMessageId = (MessageIdImpl) lastMessage.getMessageId();
+ MessageMetadata lastMessageMetadata = lastMessage.getMessageBuilder();
+
+ this.compressionType = lastMessageMetadata.getCompression();
+ this.compressor = CompressionCodecProvider.getCompressionCodec(lastMessageMetadata.getCompression());
+
+ if (!lastMessage.getEncryptionCtx().isEmpty()) {
+ EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();
+
+ if (cryptoKeyReader == null) {
+ throw new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
+ }
+
+ this.encryptionKeys = encryptionContext.getKeys().keySet();
+ this.msgCrypto =
+ new MessageCryptoBc(String.format(
+ "[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
+ try {
+ msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
+ } catch (PulsarClientException.CryptoException e) {
+ throw new IllegalArgumentException("Failed to set encryption keys", e);
+ }
+ }
+
+ ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
+ updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
+ ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+ messageMetadata, encryptedPayload);
+
+ MessageIdData idData = new MessageIdData();
+ idData.setLedgerId(lastMessageId.getLedgerId());
+ idData.setEntryId(lastMessageId.getEntryId());
+ idData.setPartition(lastMessageId.getPartitionIndex());
+
+ // Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
+ // Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
+ int idSize = idData.getSerializedSize();
+ int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
+ int totalSize = headerSize + metadataAndPayload.readableBytes();
+ ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
+ buf.writeInt(idSize);
+ idData.writeTo(buf);
+ buf.writeInt(metadataAndPayload.readableBytes());
+ buf.writeBytes(metadataAndPayload);
+ encryptedPayload.release();
+ clear();
+ return buf;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 522618f9d6a399..e93a642c76e4df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -40,8 +40,8 @@ public abstract class Compactor {
protected final ServiceConfiguration conf;
protected final ScheduledExecutorService scheduler;
- private final PulsarClient pulsar;
- private final BookKeeper bk;
+ protected final PulsarClient pulsar;
+ protected final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;
public Compactor(ServiceConfiguration conf,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
new file mode 100644
index 00000000000000..4c264360ea883a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import io.netty.buffer.ByteBuf;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.CompactionReaderImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.RawBatchMessageContainerImpl;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compaction will go through the topic in two passes. The first pass
+ * selects valid message(defined in the TopicCompactionStrategy.isValid())
+ * for each key in the topic. Then, the second pass writes these values
+ * to a ledger.
+ *
+ * As the first pass caches the entire message(not just offset) for each key into a map,
+ * this compaction could be memory intensive if the message payload is large.
+ */
+public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
+ private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class);
+ private static final int MAX_OUTSTANDING = 500;
+ private final Duration phaseOneLoopReadTimeout;
+ private final RawBatchMessageContainerImpl batchMessageContainer;
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler,
+ int maxNumMessagesInBatch) {
+ super(conf, pulsar, bk, scheduler);
+ batchMessageContainer = new RawBatchMessageContainerImpl(maxNumMessagesInBatch);
+ phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
+ }
+
+ public StrategicTwoPhaseCompactor(ServiceConfiguration conf,
+ PulsarClient pulsar,
+ BookKeeper bk,
+ ScheduledExecutorService scheduler) {
+ this(conf, pulsar, bk, scheduler, -1);
+ }
+
+ public CompletableFuture compact(String topic) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public CompletableFuture compact(String topic,
+ TopicCompactionStrategy strategy) {
+ return compact(topic, strategy, null);
+ }
+
+ public CompletableFuture compact(String topic,
+ TopicCompactionStrategy strategy,
+ CryptoKeyReader cryptoKeyReader) {
+ CompletableFuture> consumerFuture = new CompletableFuture<>();
+ if (cryptoKeyReader != null) {
+ batchMessageContainer.setCryptoKeyReader(cryptoKeyReader);
+ }
+ CompactionReaderImpl reader = CompactionReaderImpl.create(
+ (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture, cryptoKeyReader);
+
+ return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
+ }
+
+ CompletableFuture doCompaction(Reader reader, TopicCompactionStrategy strategy) {
+
+ if (!(reader instanceof CompactionReaderImpl)) {
+ return CompletableFuture.failedFuture(
+ new IllegalStateException("reader has to be DelayedAckReaderImpl"));
+ }
+ return reader.hasMessageAvailableAsync()
+ .thenCompose(available -> {
+ if (available) {
+ return phaseOne(reader, strategy)
+ .thenCompose((result) -> phaseTwo(result, reader, bk));
+ } else {
+ log.info("Skip compaction of the empty topic {}", reader.getTopic());
+ return CompletableFuture.completedFuture(-1L);
+ }
+ });
+ }
+
+ CompletableFuture compactAndCloseReader(Reader reader, TopicCompactionStrategy strategy) {
+ CompletableFuture promise = new CompletableFuture<>();
+ mxBean.addCompactionStartOp(reader.getTopic());
+ doCompaction(reader, strategy).whenComplete(
+ (ledgerId, exception) -> {
+ log.info("Completed doCompaction ledgerId:{}", ledgerId);
+ reader.closeAsync().whenComplete((v, exception2) -> {
+ if (exception2 != null) {
+ log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+ }
+ if (exception != null) {
+ // complete with original exception
+ mxBean.addCompactionEndOp(reader.getTopic(), false);
+ promise.completeExceptionally(exception);
+ } else {
+ mxBean.addCompactionEndOp(reader.getTopic(), true);
+ promise.complete(ledgerId);
+ }
+ });
+ });
+ return promise;
+ }
+
+ private boolean doCompactMessage(Message msg, PhaseOneResult result, TopicCompactionStrategy strategy) {
+ Map> cache = result.cache;
+ String key = msg.getKey();
+
+ if (key == null) {
+ msg.release();
+ return true;
+ }
+ T val = msg.getValue();
+ Message prev = cache.get(key);
+ T prevVal = prev == null ? null : prev.getValue();
+
+ if (strategy.isValid(prevVal, val)) {
+ if (val != null && msg.size() > 0) {
+ cache.remove(key); // to reorder
+ cache.put(key, msg);
+ } else {
+ cache.remove(key);
+ msg.release();
+ }
+
+ if (prev != null) {
+ prev.release();
+ }
+
+ result.validCompactionCount.incrementAndGet();
+ return true;
+ } else {
+ msg.release();
+ result.invalidCompactionCount.incrementAndGet();
+ return false;
+ }
+
+ }
+
+ private static class PhaseOneResult {
+ MessageId firstId;
+ //MessageId to; // last undeleted messageId
+ MessageId lastId; // last read messageId
+ Map> cache;
+
+ AtomicInteger invalidCompactionCount;
+
+ AtomicInteger validCompactionCount;
+
+ AtomicInteger numReadMessages;
+
+ String topic;
+
+ PhaseOneResult(String topic) {
+ this.topic = topic;
+ cache = new LinkedHashMap<>();
+ invalidCompactionCount = new AtomicInteger();
+ validCompactionCount = new AtomicInteger();
+ numReadMessages = new AtomicInteger();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "{Topic:%s, firstId:%s, lastId:%s, cache.size:%d, "
+ + "invalidCompactionCount:%d, validCompactionCount:%d, numReadMessages:%d}",
+ topic,
+ firstId != null ? firstId.toString() : "",
+ lastId != null ? lastId.toString() : "",
+ cache.size(),
+ invalidCompactionCount.get(),
+ validCompactionCount.get(),
+ numReadMessages.get());
+ }
+ }
+
+
+ private CompletableFuture phaseOne(Reader reader, TopicCompactionStrategy strategy) {
+ CompletableFuture promise = new CompletableFuture<>();
+ PhaseOneResult result = new PhaseOneResult(reader.getTopic());
+
+ ((CompactionReaderImpl) reader).getLastMessageIdAsync()
+ .thenAccept(lastMessageId -> {
+ log.info("Commencing phase one of compaction for {}, reading to {}",
+ reader.getTopic(), lastMessageId);
+ result.lastId = copyMessageId(lastMessageId);
+ phaseOneLoop(reader, promise, result, strategy);
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+
+ return promise;
+
+ }
+
+ private static MessageId copyMessageId(MessageId msgId) {
+ if (msgId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl tempId = (BatchMessageIdImpl) msgId;
+ return new BatchMessageIdImpl(tempId);
+ } else if (msgId instanceof MessageIdImpl) {
+ MessageIdImpl tempId = (MessageIdImpl) msgId;
+ return new MessageIdImpl(tempId.getLedgerId(), tempId.getEntryId(),
+ tempId.getPartitionIndex());
+ } else {
+ throw new IllegalStateException("Unknown lastMessageId type");
+ }
+ }
+
+ private void phaseOneLoop(Reader reader, CompletableFuture promise,
+ PhaseOneResult result, TopicCompactionStrategy strategy) {
+
+ if (promise.isDone()) {
+ return;
+ }
+
+ CompletableFuture> future = reader.readNextAsync();
+ FutureUtil.addTimeoutHandling(future,
+ phaseOneLoopReadTimeout, scheduler,
+ () -> FutureUtil.createTimeoutException("Timeout", getClass(),
+ "phaseOneLoop(...)"));
+
+ future.thenAcceptAsync(msg -> {
+
+ MessageId id = msg.getMessageId();
+ boolean completed = false;
+ if (result.lastId.compareTo(id) == 0) {
+ completed = true;
+ }
+
+ result.numReadMessages.incrementAndGet();
+ mxBean.addCompactionReadOp(reader.getTopic(), msg.size());
+ if (doCompactMessage(msg, result, strategy)) {
+ mxBean.addCompactionRemovedEvent(reader.getTopic());
+ }
+ //set ids in the result
+ if (result.firstId == null) {
+ result.firstId = copyMessageId(id);
+ log.info("Resetting cursor to firstId:{}", result.firstId);
+ try {
+ reader.seek(result.firstId);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Failed to reset the cursor to firstId:" + result.firstId, e);
+ }
+ }
+ if (completed) {
+ promise.complete(result);
+ } else {
+ phaseOneLoop(reader, promise, result, strategy);
+ }
+
+ }, scheduler).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+
+ }
+
+ private CompletableFuture phaseTwo(PhaseOneResult phaseOneResult, Reader reader, BookKeeper bk) {
+ log.info("Completed phase one. Result:{}. ", phaseOneResult);
+ Map metadata =
+ LedgerMetadataUtils.buildMetadataForCompactedLedger(
+ phaseOneResult.topic, phaseOneResult.lastId.toByteArray());
+ return createLedger(bk, metadata)
+ .thenCompose((ledger) -> {
+ log.info(
+ "Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}",
+ phaseOneResult.topic, phaseOneResult.firstId, phaseOneResult.lastId,
+ phaseOneResult.cache.size(), ledger.getId());
+ return runPhaseTwo(phaseOneResult, reader, ledger, bk);
+ });
+ }
+
+ private CompletableFuture runPhaseTwo(
+ PhaseOneResult phaseOneResult, Reader reader, LedgerHandle ledger, BookKeeper bk) {
+ CompletableFuture promise = new CompletableFuture<>();
+ Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
+ CompletableFuture loopPromise = new CompletableFuture<>();
+ phaseTwoLoop(phaseOneResult.topic, phaseOneResult.cache.values().iterator(), ledger,
+ outstanding, loopPromise);
+ loopPromise.thenCompose((v) -> {
+ log.info("Flushing batch container numMessagesInBatch:{}",
+ batchMessageContainer.getNumMessagesInBatch());
+ return addToCompactedLedger(ledger, null, reader.getTopic(), outstanding)
+ .whenComplete((res, exception2) -> {
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ return;
+ }
+ });
+ })
+ .thenCompose(v -> {
+ log.info("Acking ledger id {}", phaseOneResult.firstId);
+ return ((CompactionReaderImpl) reader)
+ .acknowledgeCumulativeAsync(
+ phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
+ ledger.getId()));
+ })
+ .thenCompose((v) -> closeLedger(ledger))
+ .whenComplete((v, exception) -> {
+ if (exception != null) {
+ deleteLedger(bk, ledger).whenComplete((res2, exception2) -> {
+ if (exception2 != null) {
+ log.error("Cleanup of ledger {} for failed", ledger, exception2);
+ }
+ // complete with original exception
+ promise.completeExceptionally(exception);
+ });
+ } else {
+ log.info("kept ledger:{}", ledger.getId());
+ promise.complete(ledger.getId());
+ }
+ });
+
+ return promise;
+ }
+
+ private void phaseTwoLoop(String topic, Iterator> reader,
+ LedgerHandle lh, Semaphore outstanding,
+ CompletableFuture promise) {
+ if (promise.isDone()) {
+ return;
+ }
+ CompletableFuture.runAsync(() -> {
+ if (reader.hasNext()) {
+ Message message = reader.next();
+ mxBean.addCompactionReadOp(topic, message.size());
+ addToCompactedLedger(lh, message, topic, outstanding)
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ return;
+ }
+ });
+ phaseTwoLoop(topic, reader, lh, outstanding, promise);
+ } else {
+ try {
+ outstanding.acquire(MAX_OUTSTANDING);
+ } catch (InterruptedException e) {
+ promise.completeExceptionally(e);
+ return;
+ }
+ promise.complete(null);
+ return;
+ }
+
+ }, scheduler)
+ .exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ CompletableFuture addToCompactedLedger(
+ LedgerHandle lh, Message m, String topic, Semaphore outstanding) {
+ CompletableFuture bkf = new CompletableFuture<>();
+ if (m == null || batchMessageContainer.add((MessageImpl>) m, null)) {
+ if (batchMessageContainer.getNumMessagesInBatch() > 0) {
+ ByteBuf serialized = batchMessageContainer.toByteBuf();
+ try {
+ outstanding.acquire();
+ mxBean.addCompactionWriteOp(topic, serialized.readableBytes());
+ long start = System.nanoTime();
+ lh.asyncAddEntry(serialized,
+ (rc, ledger, eid, ctx) -> {
+ outstanding.release();
+ mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ if (rc != BKException.Code.OK) {
+ bkf.completeExceptionally(BKException.create(rc));
+ } else {
+ bkf.complete(true);
+ }
+ }, null);
+
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
+ }
+ } else {
+ bkf.complete(false);
+ }
+ } else {
+ bkf.complete(false);
+ }
+ return bkf;
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 84107190fd305e..821dd9c0c9d23c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -59,7 +59,7 @@
public class TwoPhaseCompactor extends Compactor {
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
- private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
+ protected static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
private final Duration phaseOneLoopReadTimeout;
public TwoPhaseCompactor(ServiceConfiguration conf,
@@ -309,7 +309,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map
});
}
- private CompletableFuture createLedger(BookKeeper bk, Map metadata) {
+ protected CompletableFuture createLedger(BookKeeper bk, Map metadata) {
CompletableFuture bkf = new CompletableFuture<>();
try {
@@ -332,7 +332,7 @@ private CompletableFuture createLedger(BookKeeper bk, Map deleteLedger(BookKeeper bk, LedgerHandle lh) {
+ protected CompletableFuture deleteLedger(BookKeeper bk, LedgerHandle lh) {
CompletableFuture bkf = new CompletableFuture<>();
try {
bk.asyncDeleteLedger(lh.getId(),
@@ -349,7 +349,7 @@ private CompletableFuture deleteLedger(BookKeeper bk, LedgerHandle lh) {
return bkf;
}
- private CompletableFuture closeLedger(LedgerHandle lh) {
+ protected CompletableFuture closeLedger(LedgerHandle lh) {
CompletableFuture bkf = new CompletableFuture<>();
try {
lh.asyncClose((rc, ledger, ctx) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
new file mode 100644
index 00000000000000..2021784313e76b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactionReaderImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class CompactionReaderImplTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("my-property",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ String topic = "persistent://my-property/my-ns/my-compact-topic";
+ int numKeys = 5;
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+ for (int i = 0; i < numKeys; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ @Cleanup
+ CompactionReaderImpl reader = CompactionReaderImpl
+ .create((PulsarClientImpl) pulsarClient, Schema.STRING, topic, new CompletableFuture(), null);
+
+ ConsumerBase consumerBase = spy(reader.getConsumer());
+ org.apache.commons.lang3.reflect.FieldUtils.writeDeclaredField(
+ reader, "consumer", consumerBase, true);
+
+ ReaderConfigurationData readerConfigurationData =
+ (ReaderConfigurationData) FieldUtils.readDeclaredField(
+ reader, "readerConfiguration", true);
+
+
+ ReaderConfigurationData expected = new ReaderConfigurationData<>();
+ expected.setTopicName(topic);
+ expected.setSubscriptionName(COMPACTION_SUBSCRIPTION);
+ expected.setStartMessageId(MessageId.earliest);
+ expected.setStartMessageFromRollbackDurationInSec(0);
+ expected.setReadCompacted(true);
+ expected.setSubscriptionMode(SubscriptionMode.Durable);
+ expected.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+ MessageIdImpl lastMessageId = (MessageIdImpl) reader.getLastMessageIdAsync().get();
+ MessageIdImpl id = null;
+ MessageImpl m = null;
+
+ Assert.assertEquals(readerConfigurationData, expected);
+ for (int i = 0; i < numKeys; i++) {
+ m = (MessageImpl) reader.readNextAsync().get();
+ id = (MessageIdImpl) m.getMessageId();
+ }
+ Assert.assertEquals(id, lastMessageId);
+ verify(consumerBase, times(0))
+ .acknowledgeCumulativeAsync(Mockito.any(MessageId.class));
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 4f8e08a5f8e22a..7b9a25a06251f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -152,6 +152,14 @@ public void testBatchMessageOOMMemoryRelease() throws Exception {
}).when(mockAllocator).buffer(anyInt());
final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator);
+ /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+ eventually sendAsync() catches this NPE and releases the memory and semaphore.
+ } catch (Throwable t) {
+ completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+ new PulsarClientException(t, msg.getSequenceId()));
+ }
+ */
+ batchMessageContainer.setProducer(producer);
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 1d4f8c9b67471b..2f8cb655401d90 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -305,6 +305,14 @@ public void testBatchMessageOOMProducerSemaphoreRelease() throws Exception {
Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
batchMessageContainerField.setAccessible(true);
batchMessageContainerField.set(spyProducer, batchMessageContainer);
+ /* Without `batchMessageContainer.setProducer(producer);` it throws NPE since producer is null, and
+ eventually sendAsync() catches this NPE and releases the memory and semaphore.
+ } catch (Throwable t) {
+ completeCallbackAndReleaseSemaphore(uncompressedSize, callback,
+ new PulsarClientException(t, msg.getSequenceId()));
+ }
+ */
+ batchMessageContainer.setProducer(producer);
spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
Assert.fail("can not reach here");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
new file mode 100644
index 00000000000000..1bb489cadaf69d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImplTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+
+import static org.apache.pulsar.common.api.proto.CompressionType.LZ4;
+import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.compaction.CompactionTest;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class RawBatchMessageContainerImplTest {
+ CompressionType compressionType;
+ MessageCrypto msgCrypto;
+ CryptoKeyReader cryptoKeyReader;
+ Map encryptKeys;
+
+ public void setEncryptionAndCompression(boolean encrypt, boolean compress) {
+ if(compress){
+ compressionType = LZ4;
+ } else {
+ compressionType = NONE;
+ }
+
+ if(encrypt) {
+ cryptoKeyReader = new CompactionTest.EncKeyReader();
+ msgCrypto = new MessageCryptoBc("test", false);
+ String key = "client-ecdsa.pem";
+ EncryptionKeyInfo publicKeyInfo = cryptoKeyReader.getPublicKey(key, null);
+ encryptKeys = Map.of(
+ key, new EncryptionContext.EncryptionKey(publicKeyInfo.getKey(), publicKeyInfo.getMetadata()));
+ } else {
+ compressionType = null;
+ msgCrypto = null;
+ cryptoKeyReader = null;
+ encryptKeys = null;
+ }
+ }
+
+ public MessageImpl createMessage(String topic, String value, int entryId) {
+ MessageMetadata metadata = new MessageMetadata()
+ .setPublishTime(System.currentTimeMillis())
+ .setProducerName("test")
+ .setSequenceId(entryId);
+
+ MessageIdImpl id = new MessageIdImpl(0, entryId, -1);
+
+ if (compressionType != null) {
+ metadata.setCompression(compressionType);
+ }
+ Optional encryptionContext = null;
+ if(encryptKeys != null) {
+ EncryptionContext tmp = new EncryptionContext();
+ tmp.setKeys(encryptKeys);
+ encryptionContext = Optional.of(tmp);
+ } else {
+ encryptionContext = Optional.empty();
+ }
+ ByteBuf payload = Unpooled.copiedBuffer(value.getBytes());
+ return new MessageImpl(topic, id,metadata, payload, encryptionContext, null, Schema.STRING);
+ }
+
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ setEncryptionAndCompression(false, false);
+ }
+ @Test
+ public void testToByteBuf() throws IOException {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ container.add(createMessage(topic, "hi-2", 1), null);
+ ByteBuf buf = container.toByteBuf();
+
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 1);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+ Assert.assertEquals(metadata.getCompression(), NONE);
+
+ SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+ ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+ singleMessageMetadataAndPayload.getPayload(), messageMetadata, 0, 2);
+ ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+ singleMessageMetadataAndPayload.getPayload(), messageMetadata, 1, 2);
+
+ Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+ Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+ payload1.release();
+ payload2.release();
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ buf.release();
+ }
+
+ @Test
+ public void testToByteBufWithCompressionAndEncryption() throws IOException {
+ setEncryptionAndCompression(true, true);
+
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ container.setCryptoKeyReader(cryptoKeyReader);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ container.add(createMessage(topic, "hi-2", 1), null);
+ ByteBuf buf = container.toByteBuf();
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 1);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 2);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 1);
+ Assert.assertEquals(metadata.getCompression(), compressionType);
+
+ ByteBuf payload = singleMessageMetadataAndPayload.getPayload();
+ int maxDecryptedSize = msgCrypto.getMaxOutputSize(payload.readableBytes());
+ ByteBuffer decrypted = ByteBuffer.allocate(maxDecryptedSize);
+ msgCrypto.decrypt(() -> metadata, payload.nioBuffer(), decrypted, cryptoKeyReader);
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressed = codec.decode(Unpooled.wrappedBuffer(decrypted),
+ metadata.getUncompressedSize());
+ SingleMessageMetadata messageMetadata = new SingleMessageMetadata();
+
+ ByteBuf payload1 = Commands.deSerializeSingleMessageInBatch(
+ uncompressed, messageMetadata, 0, 2);
+ ByteBuf payload2 = Commands.deSerializeSingleMessageInBatch(
+ uncompressed, messageMetadata, 1, 2);
+
+ Assert.assertEquals(payload1.toString(Charset.defaultCharset()), "hi-1");
+ Assert.assertEquals(payload2.toString(Charset.defaultCharset()), "hi-2");
+ payload1.release();
+ payload2.release();
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ uncompressed.release();
+ buf.release();
+ }
+
+ @Test
+ public void testToByteBufWithSingleMessage() throws IOException {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(2);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ ByteBuf buf = container.toByteBuf();
+
+
+ int idSize = buf.readInt();
+ ByteBuf idBuf = buf.readBytes(idSize);
+ MessageIdData idData = new MessageIdData();
+ idData.parseFrom(idBuf, idSize);
+ Assert.assertEquals(idData.getLedgerId(), 0);
+ Assert.assertEquals(idData.getEntryId(), 0);
+ Assert.assertEquals(idData.getPartition(), -1);
+
+
+ int metadataAndPayloadSize = buf.readInt();
+ ByteBuf metadataAndPayload = buf.readBytes(metadataAndPayloadSize);
+ MessageImpl singleMessageMetadataAndPayload = MessageImpl.deserialize(metadataAndPayload);
+ MessageMetadata metadata = singleMessageMetadataAndPayload.getMessageBuilder();
+ Assert.assertEquals(metadata.getNumMessagesInBatch(), 1);
+ Assert.assertEquals(metadata.getHighestSequenceId(), 0);
+ Assert.assertEquals(metadata.getCompression(), NONE);
+
+ Assert.assertEquals(singleMessageMetadataAndPayload.getPayload().toString(Charset.defaultCharset()), "hi-1");
+ singleMessageMetadataAndPayload.release();
+ metadataAndPayload.release();
+ buf.release();
+ }
+
+ @Test
+ public void testMaxNumMessagesInBatch() {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ String topic = "my-topic";
+
+ boolean isFull = container.add(createMessage(topic, "hi", 0), null);
+ Assert.assertTrue(isFull);
+ Assert.assertTrue(container.isBatchFull());
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void testCreateOpSendMsg() {
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ container.createOpSendMsg();
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testToByteBufWithEncryptionWithoutCryptoKeyReader() {
+ setEncryptionAndCompression(true, false);
+ RawBatchMessageContainerImpl container = new RawBatchMessageContainerImpl(1);
+ String topic = "my-topic";
+ container.add(createMessage(topic, "hi-1", 0), null);
+ container.toByteBuf();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 62e98cd1d7b568..a3c017beeb36eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -31,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
@@ -277,4 +278,51 @@ public void testAck(boolean partitionedTopic) throws Exception {
}
+
+ @Test(timeOut = 30 * 1000)
+ public void testListen() throws Exception {
+ String topic = "persistent://public/default/tableview-listen-test";
+ admin.topics().createNonPartitionedTopic(topic);
+
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ @Cleanup
+ TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+
+ class MockAction implements BiConsumer {
+ int acceptedCount = 0;
+ @Override
+ public void accept(String s, String s2) {
+ acceptedCount++;
+ }
+ }
+ MockAction mockAction = new MockAction();
+ tv.listen((k, v) -> mockAction.accept(k, v));
+
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(Duration.ofMillis(5000))
+ .until(() -> tv.size() == 5);
+
+ assertEquals(mockAction.acceptedCount, 0);
+
+ for (int i = 5; i < 10; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(Duration.ofMillis(5000))
+ .until(() -> tv.size() == 10);
+
+ assertEquals(mockAction.acceptedCount, 5);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
index f613dda3bf3567..055c595fbfec88 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java
@@ -31,6 +31,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -54,8 +55,9 @@
@Slf4j
@Test(groups = "broker")
public class CompactionRetentionTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
- private BookKeeper bk;
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
+ private TwoPhaseCompactor compactor;
@BeforeMethod
@Override
@@ -72,18 +74,23 @@ public void setup() throws Exception {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
-
+ bk.close();
if (compactionScheduler != null) {
compactionScheduler.shutdownNow();
}
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
/**
* Compaction should retain expired keys in the compacted view
*/
@@ -105,8 +112,7 @@ public void testCompaction() throws Exception {
.topic(topic)
.create();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 1: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -141,7 +147,7 @@ public void testCompaction() throws Exception {
.send();
}
- compactor.compact(topic).join();
+ compact(topic);
validateMessages(pulsarClient, true, topic, round, allKeys);
@@ -152,7 +158,7 @@ public void testCompaction() throws Exception {
.send();
}
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 4: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -221,8 +227,6 @@ private void testCompactionCursorRetention(String topic) throws Exception {
.topic(topic)
.create();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
-
log.info(" ---- X 1: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
@@ -240,7 +244,7 @@ private void testCompactionCursorRetention(String topic) throws Exception {
validateMessages(pulsarClient, true, topic, round, allKeys);
- compactor.compact(topic).join();
+ compact(topic);
log.info(" ---- X 3: {}", mapper.writeValueAsString(
admin.topics().getInternalStats(topic, false)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 5df7e094c93e32..ca5933e3ed7e9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -75,6 +75,7 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -92,8 +93,9 @@
@Test(groups = "flaky")
public class CompactionTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
- private BookKeeper bk;
+ protected ScheduledExecutorService compactionScheduler;
+ protected BookKeeper bk;
+ private TwoPhaseCompactor compactor;
@BeforeMethod
@Override
@@ -108,18 +110,33 @@ public void setup() throws Exception {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build());
bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
-
+ bk.close();
if (compactionScheduler != null) {
compactionScheduler.shutdownNow();
}
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+
+ protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+ throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+ protected TwoPhaseCompactor getCompactor() {
+ return compactor;
+ }
+
@Test
public void testCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
@@ -147,8 +164,7 @@ public void testCompaction() throws Exception {
all.add(Pair.of(key, data));
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to number of unique key.
@@ -213,9 +229,7 @@ public void testCompactionWithReader() throws Exception {
all.add(Pair.of(key, value));
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
-
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
@@ -278,8 +292,7 @@ public void testReadCompactedBeforeCompaction() throws Exception {
Assert.assertEquals(m.getData(), "content2".getBytes());
}
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -304,8 +317,7 @@ public void testReadEntriesAfterCompaction() throws Exception {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
producer.newMessage().key("key0").value("content3".getBytes()).send();
@@ -334,8 +346,7 @@ public void testSeekEarliestAfterCompaction() throws Exception {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -378,8 +389,7 @@ public void testBrokerRestartAfterCompaction() throws Exception {
producer.newMessage().key("key0").value("content1".getBytes()).send();
producer.newMessage().key("key0").value("content2".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
@@ -417,7 +427,7 @@ public void testCompactEmptyTopic() throws Exception {
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
- new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
+ compact(topic);
producer.newMessage().key("key0").value("content0".getBytes()).send();
@@ -453,8 +463,7 @@ public void testFirstMessageRetained() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// Check that messages after compaction have same ids
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
@@ -512,8 +521,7 @@ public void testBatchMessageIdsDontChange() throws Exception {
((BatchMessageIdImpl)messages.get(2).getMessageId()).getEntryId());
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// Check that messages after compaction have same ids
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
@@ -521,12 +529,23 @@ public void testBatchMessageIdsDontChange() throws Exception {
Message message1 = consumer.receive();
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
Message message2 = consumer.receive();
Assert.assertEquals(message2.getKey(), "key2");
Assert.assertEquals(new String(message2.getData()), "my-message-3");
- Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ MessageIdImpl id = (MessageIdImpl) messages.get(0).getMessageId();
+ MessageIdImpl id1 = new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+ Assert.assertEquals(message1.getMessageId(), id1);
+ id = (MessageIdImpl) messages.get(2).getMessageId();
+ MessageIdImpl id2 = new MessageIdImpl(
+ id.getLedgerId(), id.getEntryId(), id.getPartitionIndex());
+ Assert.assertEquals(message2.getMessageId(), id2);
+ } else {
+ Assert.assertEquals(message1.getMessageId(), messages.get(0).getMessageId());
+ Assert.assertEquals(message2.getMessageId(), messages.get(2).getMessageId());
+ }
}
}
@@ -556,8 +575,7 @@ public void testWholeBatchCompactedOut() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -593,34 +611,46 @@ public void testKeyLessMessagesPassThrough() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
- Message message1 = consumer.receive();
- Assert.assertFalse(message1.hasKey());
- Assert.assertEquals(new String(message1.getData()), "my-message-1");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key1");
+ Assert.assertEquals(new String(message3.getData()), "my-message-4");
- Message message2 = consumer.receive();
- Assert.assertFalse(message2.hasKey());
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
+ Message message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "my-message-6");
- Message message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key1");
- Assert.assertEquals(new String(message3.getData()), "my-message-4");
+ Message m = consumer.receive(2, TimeUnit.SECONDS);
+ assertNull(m);
+ } else {
+ Message message1 = consumer.receive();
+ Assert.assertFalse(message1.hasKey());
+ Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "my-message-6");
+ Message message2 = consumer.receive();
+ Assert.assertFalse(message2.hasKey());
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
- Message message5 = consumer.receive();
- Assert.assertFalse(message5.hasKey());
- Assert.assertEquals(new String(message5.getData()), "my-message-7");
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key1");
+ Assert.assertEquals(new String(message3.getData()), "my-message-4");
- Message message6 = consumer.receive();
- Assert.assertFalse(message6.hasKey());
- Assert.assertEquals(new String(message6.getData()), "my-message-8");
+ Message message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "my-message-6");
+
+ Message message5 = consumer.receive();
+ Assert.assertFalse(message5.hasKey());
+ Assert.assertEquals(new String(message5.getData()), "my-message-7");
+
+ Message message6 = consumer.receive();
+ Assert.assertFalse(message6.hasKey());
+ Assert.assertEquals(new String(message6.getData()), "my-message-8");
+ }
}
}
@@ -693,8 +723,7 @@ public void testEmptyPayloadDeletes() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -773,8 +802,7 @@ public void testEmptyPayloadDeletesWhenCompressed() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -837,8 +865,7 @@ public void testCompactorReadsCompacted() throws Exception {
Assert.assertTrue(ledgersOpened.isEmpty()); // no ledgers should have been opened
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// should have opened all except last to read
Assert.assertTrue(ledgersOpened.contains(info.ledgers.get(0).ledgerId));
@@ -866,7 +893,7 @@ public void testCompactorReadsCompacted() throws Exception {
ledgersOpened.clear();
// compact the topic again
- compactor.compact(topic).get();
+ compact(topic);
// shouldn't have opened first ledger (already compacted), penultimate would have some uncompacted data.
// last ledger already open for writing
@@ -916,8 +943,7 @@ public void testCompactCompressedNoBatch() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -960,8 +986,7 @@ public void testCompactCompressedBatching() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").readCompacted(true).subscribe()){
@@ -975,7 +1000,7 @@ public void testCompactCompressedBatching() throws Exception {
}
}
- class EncKeyReader implements CryptoKeyReader {
+ public static class EncKeyReader implements CryptoKeyReader {
EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
@Override
@@ -1037,8 +1062,7 @@ public void testCompactEncryptedNoBatch() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// Check that messages after compaction have same ids
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
@@ -1083,11 +1107,8 @@ public void testCompactEncryptedBatching() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
- // with encryption, all messages are passed through compaction as it doesn't
- // have the keys to decrypt the batch payload
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").cryptoKeyReader(new EncKeyReader())
.readCompacted(true).subscribe()){
@@ -1095,13 +1116,21 @@ public void testCompactEncryptedBatching() throws Exception {
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
-
- Message message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ } else {
+ // with encryption, all messages are passed through compaction as it doesn't
+ // have the keys to decrypt the batch payload
+ Message message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ }
}
}
@@ -1132,8 +1161,7 @@ public void testCompactEncryptedAndCompressedNoBatch() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// Check that messages after compaction have same ids
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
@@ -1179,8 +1207,7 @@ public void testCompactEncryptedAndCompressedBatching() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
// with encryption, all messages are passed through compaction as it doesn't
// have the keys to decrypt the batch payload
@@ -1191,13 +1218,20 @@ public void testCompactEncryptedAndCompressedBatching() throws Exception {
Assert.assertEquals(message1.getKey(), "key1");
Assert.assertEquals(new String(message1.getData()), "my-message-1");
- Message message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
- Message message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key2");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ } else {
+ Message message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key2");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+ }
}
}
@@ -1254,8 +1288,7 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
}
// compact the topic
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic, new EncKeyReader());
try (Consumer consumer = pulsarClient.newConsumer().topic(topic)
.cryptoKeyReader(new EncKeyReader())
@@ -1264,22 +1297,32 @@ public void testEmptyPayloadDeletesWhenEncrypted() throws Exception {
Assert.assertEquals(message1.getKey(), "key0");
Assert.assertEquals(new String(message1.getData()), "my-message-0");
- // see all messages from batch
- Message message2 = consumer.receive();
- Assert.assertEquals(message2.getKey(), "key2");
- Assert.assertEquals(new String(message2.getData()), "my-message-2");
+ if (getCompactor() instanceof StrategicTwoPhaseCompactor) {
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key3");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
- Message message3 = consumer.receive();
- Assert.assertEquals(message3.getKey(), "key3");
- Assert.assertEquals(new String(message3.getData()), "my-message-3");
-
- Message message4 = consumer.receive();
- Assert.assertEquals(message4.getKey(), "key2");
- Assert.assertEquals(new String(message4.getData()), "");
-
- Message message5 = consumer.receive();
- Assert.assertEquals(message5.getKey(), "key4");
- Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ Message message5 = consumer.receive();
+ Assert.assertEquals(message5.getKey(), "key4");
+ Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ } else {
+ // see all messages from batch
+ Message message2 = consumer.receive();
+ Assert.assertEquals(message2.getKey(), "key2");
+ Assert.assertEquals(new String(message2.getData()), "my-message-2");
+
+ Message message3 = consumer.receive();
+ Assert.assertEquals(message3.getKey(), "key3");
+ Assert.assertEquals(new String(message3.getData()), "my-message-3");
+
+ Message message4 = consumer.receive();
+ Assert.assertEquals(message4.getKey(), "key2");
+ Assert.assertEquals(new String(message4.getData()), "");
+
+ Message message5 = consumer.receive();
+ Assert.assertEquals(message5.getKey(), "key4");
+ Assert.assertEquals(new String(message5.getData()), "my-message-4");
+ }
}
}
@@ -1303,8 +1346,7 @@ public void testCompactionWithLastDeletedKey(boolean batching) throws Exception
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
Set expected = Sets.newHashSet("3");
// consumer with readCompacted enabled only get compacted entries
@@ -1329,8 +1371,7 @@ public void testEmptyCompactionLedger(boolean batching) throws Exception {
producer.newMessage().key("1").value("".getBytes()).send();
producer.newMessage().key("2").value("".getBytes()).send();
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1364,8 +1405,7 @@ public void testAllEmptyCompactionLedger(boolean batchEnabled) throws Exception
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1409,8 +1449,7 @@ public void testBatchAndNonBatchWithoutEmptyPayload() throws PulsarClientExcepti
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1465,8 +1504,7 @@ public void testBatchAndNonBatchWithEmptyPayload() throws PulsarClientException,
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1519,8 +1557,7 @@ public void testBatchAndNonBatchEndOfEmptyPayload() throws PulsarClientException
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// consumer with readCompacted enabled only get compacted entries
try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
@@ -1559,8 +1596,7 @@ public void testCompactMultipleTimesWithoutEmptyMessage(boolean batchEnabled) th
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// 3. Send more ten messages
futures.clear();
@@ -1608,8 +1644,7 @@ public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientExcepti
FutureUtil.waitForAll(futures).get();
// 2.compact the topic.
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
// 3. Send more ten messages
futures.clear();
@@ -1638,7 +1673,7 @@ public void testReadUnCompacted(boolean batchEnabled) throws PulsarClientExcepti
producer.newMessage().key(key).value(("").getBytes()).send();
// 5.compact the topic.
- compactor.compact(topic).get();
+ compact(topic);
try (Consumer consumer = pulsarClient.newConsumer()
.topic(topic)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 8099cd51668294..e86be6a4db8162 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +59,12 @@
@Test(groups = "broker-compaction")
public class CompactorTest extends MockedPulsarServiceBaseTest {
- private ScheduledExecutorService compactionScheduler;
+ protected ScheduledExecutorService compactionScheduler;
+
+ protected BookKeeper bk;
+ protected Compactor compactor;
+
+
@BeforeMethod
@Override
@@ -73,20 +79,31 @@ public void setup() throws Exception {
compactionScheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+ bk = pulsar.getBookKeeperClientFactory().create(
+ this.conf, null, null, Optional.empty(), null);
+ compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
}
+
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
+ bk.close();
compactionScheduler.shutdownNow();
}
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return compactor.compact(topic).get();
+ }
+
+ protected Compactor getCompactor() {
+ return compactor;
+ }
+
private List compactAndVerify(String topic, Map expected, boolean checkMetrics) throws Exception {
- BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- long compactedLedgerId = compactor.compact(topic).get();
+
+ long compactedLedgerId = compact(topic);
LedgerHandle ledger = bk.openLedger(compactedLedgerId,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
@@ -111,7 +128,7 @@ private List compactAndVerify(String topic, Map expected
m.close();
}
if (checkMetrics) {
- CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
+ CompactionRecord compactionRecord = getCompactor().getStats().getCompactionRecordForTopic(topic).get();
long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
@@ -227,10 +244,7 @@ public void testCompactEmptyTopic() throws Exception {
// trigger creation of topic on server side
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
- BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
- this.conf, null, null, Optional.empty(), null);
- Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
- compactor.compact(topic).get();
+ compact(topic);
}
@Test
@@ -240,7 +254,6 @@ public void testPhaseOneLoopTimeConfiguration() {
TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
Mockito.mock(BookKeeper.class), compactionScheduler);
Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
-
}
public ByteBuf extractPayload(RawMessage m) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
new file mode 100644
index 00000000000000..e801f006f6d3cd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/NumericOrderCompactionStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+
+public class NumericOrderCompactionStrategy implements TopicCompactionStrategy {
+
+ @Override
+ public Schema getSchema() {
+ return Schema.INT32;
+ }
+
+ @Override
+ public boolean isValid(Integer prev, Integer cur) {
+ if (prev == null || cur == null) {
+ return true;
+ }
+ return prev < cur;
+ }
+ }
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
new file mode 100644
index 00000000000000..1cac04c2fa9562
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionRetentionTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class StrategicCompactionRetentionTest extends CompactionRetentionTest {
+ private TopicCompactionStrategy strategy;
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
new file mode 100644
index 00000000000000..ba60672cc51d06
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "flaky")
+public class StrategicCompactionTest extends CompactionTest {
+ private TopicCompactionStrategy strategy;
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+
+ @Override
+ protected long compact(String topic, CryptoKeyReader cryptoKeyReader)
+ throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy, cryptoKeyReader).get();
+ }
+
+ @Override
+ protected TwoPhaseCompactor getCompactor() {
+ return compactor;
+ }
+
+
+ @Test
+ public void testNumericOrderCompaction() throws Exception {
+
+ strategy = new NumericOrderCompactionStrategy();
+
+ String topic = "persistent://my-property/use/my-ns/my-topic1";
+ final int numMessages = 50;
+ final int maxKeys = 5;
+
+ Producer producer = pulsarClient.newProducer(strategy.getSchema())
+ .topic(topic)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ Map expected = new HashMap<>();
+ List> all = new ArrayList<>();
+ Random r = new Random(0);
+
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();
+
+ for (int j = 0; j < numMessages; j++) {
+ int keyIndex = r.nextInt(maxKeys);
+ String key = "key" + keyIndex;
+ int seed = r.nextInt(j + 1);
+ Integer cur = seed < j / 5 ? null : seed;
+ producer.newMessage().key(key).value(cur).send();
+ Integer prev = expected.get(key);
+ if (strategy.isValid(prev, cur)) {
+ if (cur == null) {
+ expected.remove(key);
+ } else {
+ expected.put(key, cur);
+ }
+ }
+ all.add(Pair.of(key, cur));
+ }
+
+ compact(topic);
+
+ PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
+ // Compacted topic ledger should have same number of entry equals to number of unique key.
+ Assert.assertEquals(expected.size(), internalStats.compactedLedger.entries);
+ Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
+ Assert.assertFalse(internalStats.compactedLedger.offloaded);
+
+ Map expectedCopy = new HashMap<>(expected);
+ // consumer with readCompacted enabled only get compacted entries
+ try (Consumer consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+ .readCompacted(true).subscribe()) {
+ while (!expected.isEmpty()) {
+ Message m = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertEquals(m.getValue(), expected.remove(m.getKey()), m.getKey());
+ }
+ Assert.assertTrue(expected.isEmpty());
+ }
+
+ // can get full backlog if read compacted disabled
+ try (Consumer consumer = pulsarClient.newConsumer(strategy.getSchema()).topic(topic).subscriptionName("sub1")
+ .readCompacted(false).subscribe()) {
+ while (true) {
+ Message m = consumer.receive(2, TimeUnit.SECONDS);
+ Pair expectedMessage = all.remove(0);
+ Assert.assertEquals(m.getKey(), expectedMessage.getLeft());
+ Assert.assertEquals(m.getValue(), expectedMessage.getRight());
+ if (all.isEmpty()) {
+ break;
+ }
+ }
+ Assert.assertTrue(all.isEmpty());
+ }
+
+ TableView tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ .topic(topic)
+ .loadConf(Map.of(
+ "topicCompactionStrategy", strategy.getClass().getCanonicalName()))
+ .create();
+ Assert.assertEquals(tableView.entrySet(), expectedCopy.entrySet());
+ }
+
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
new file mode 100644
index 00000000000000..91dd8a2bd358b9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactorTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class StrategicCompactorTest extends CompactorTest {
+ private TopicCompactionStrategy strategy;
+
+ private StrategicTwoPhaseCompactor compactor;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
+ strategy = new TopicCompactionStrategyTest.DummyTopicCompactionStrategy();
+ }
+
+ @Override
+ protected long compact(String topic) throws ExecutionException, InterruptedException {
+ return (long) compactor.compact(topic, strategy).get();
+ }
+
+ @Override
+ protected Compactor getCompactor() {
+ return compactor;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
new file mode 100644
index 00000000000000..472bea3d39b92d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionStrategyTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-compaction")
+public class TopicCompactionStrategyTest {
+ public static class DummyTopicCompactionStrategy implements TopicCompactionStrategy {
+
+ @Override
+ public Schema getSchema() {
+ return Schema.BYTES;
+ }
+
+ @Override
+ public boolean isValid(byte[] prev, byte[] cur) {
+ return true;
+ }
+ }
+
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testLoadInvalidTopicCompactionStrategy() {
+ TopicCompactionStrategy.load("uknown");
+ }
+
+ @Test
+ public void testNumericOrderCompactionStrategy() {
+ TopicCompactionStrategy strategy =
+ TopicCompactionStrategy.load(NumericOrderCompactionStrategy.class.getCanonicalName());
+ Assert.assertTrue(strategy.isValid(1, 2));
+ Assert.assertFalse(strategy.isValid(2, 1));
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
index 136d8f5382304a..9e5008c8bd0c8d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -89,7 +89,15 @@ public interface TableView extends Closeable {
void forEach(BiConsumer action);
/**
- * Performs the give action for each entry in this map until all entries
+ * Performs the given action for each future entry in this map until all entries
+ * have been processed or the action throws an exception.
+ *
+ * @param action The action to be performed for each entry
+ */
+ void listen(BiConsumer action);
+
+ /**
+ * Performs the given action for each entry in this map until all entries
* have been processed or the action throws an exception.
*
* @param action The action to be performed for each entry
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index c601f51a725270..fdbf1f15c296a8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -49,21 +49,21 @@
*/
class BatchMessageContainerImpl extends AbstractBatchMessageContainer {
- private MessageMetadata messageMetadata = new MessageMetadata();
+ protected MessageMetadata messageMetadata = new MessageMetadata();
// sequence id for this batch which will be persisted as a single entry by broker
@Getter
@Setter
- private long lowestSequenceId = -1L;
+ protected long lowestSequenceId = -1L;
@Getter
@Setter
- private long highestSequenceId = -1L;
- private ByteBuf batchedMessageMetadataAndPayload;
- private List> messages = new ArrayList<>(maxMessagesNum);
+ protected long highestSequenceId = -1L;
+ protected ByteBuf batchedMessageMetadataAndPayload;
+ protected List> messages = new ArrayList<>(maxMessagesNum);
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a batch
protected SendCallback firstCallback;
- private final ByteBufAllocator allocator;
+ protected final ByteBufAllocator allocator;
private static final int SHRINK_COOLING_OFF_PERIOD = 10;
private int consecutiveShrinkTime = 0;
@@ -111,9 +111,11 @@ public boolean add(MessageImpl> msg, SendCallback callback) {
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
- producer.semaphoreRelease(getNumMessagesInBatch());
- producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
- + batchAllocatedSizeBytes);
+ if (producer != null) {
+ producer.semaphoreRelease(getNumMessagesInBatch());
+ producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
+ + batchAllocatedSizeBytes);
+ }
discard(new PulsarClientException(e));
return false;
}
@@ -131,11 +133,14 @@ public boolean add(MessageImpl> msg, SendCallback callback) {
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
- ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+ if (producer != null) {
+ ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
+ }
+
return isBatchFull();
}
- private ByteBuf getCompressedBatchMetadataAndPayload() {
+ protected ByteBuf getCompressedBatchMetadataAndPayload() {
int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();
@@ -308,11 +313,13 @@ public OpSendMsg createOpSendMsg() throws IOException {
return op;
}
- private void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
+ protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
int delta = updatedSizeBytes - batchAllocatedSizeBytes;
batchAllocatedSizeBytes = updatedSizeBytes;
if (delta != 0) {
- producer.client.getMemoryLimitController().forceReserveMemory(delta);
+ if (producer != null) {
+ producer.client.getMemoryLimitController().forceReserveMemory(delta);
+ }
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 238beffc79f88b..7af1ed2dc901da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,7 +36,6 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
@@ -69,7 +68,8 @@ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData readerConf
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
- consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
+ consumerConfiguration.setSubscriptionMode(readerConfiguration.getSubscriptionMode());
+ consumerConfiguration.setSubscriptionInitialPosition(readerConfiguration.getSubscriptionInitialPosition());
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
@@ -164,8 +164,8 @@ public Message readNext() throws PulsarClientException {
@Override
public Message readNext(int timeout, TimeUnit unit) throws PulsarClientException {
- Message msg = consumer.receive(timeout, unit);
+ Message msg = consumer.receive(timeout, unit);
if (msg != null) {
consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
index ecc8135b2ac953..863765e80c57da 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
@@ -29,6 +29,7 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private String topicName = null;
private long autoUpdatePartitionsSeconds = 60;
+ private String topicCompactionStrategy;
@Override
public TableViewConfigurationData clone() {
@@ -36,6 +37,7 @@ public TableViewConfigurationData clone() {
TableViewConfigurationData clone = (TableViewConfigurationData) super.clone();
clone.setTopicName(topicName);
clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
+ clone.setTopicCompactionStrategy(topicCompactionStrategy);
return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index a6f39f44e01ee5..3d3c48a440e667 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@Slf4j
public class TableViewImpl implements TableView {
@@ -51,6 +52,7 @@ public class TableViewImpl implements TableView {
private final List> listeners;
private final ReentrantLock listenersMutex;
+ private TopicCompactionStrategy compactionStrategy;
TableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) {
this.conf = conf;
@@ -58,6 +60,7 @@ public class TableViewImpl implements TableView {
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
+ this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
this.reader = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
@@ -113,6 +116,16 @@ public void forEach(BiConsumer action) {
data.forEach(action);
}
+ @Override
+ public void listen(BiConsumer action) {
+ try {
+ listenersMutex.lock();
+ listeners.add(action);
+ } finally {
+ listenersMutex.unlock();
+ }
+ }
+
@Override
public void forEachAndListen(BiConsumer action) {
// Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
@@ -145,30 +158,40 @@ public void close() throws PulsarClientException {
private void handleMessage(Message msg) {
try {
if (msg.hasKey()) {
+ String key = msg.getKey();
+ T cur = msg.size() > 0 ? msg.getValue() : null;
if (log.isDebugEnabled()) {
log.debug("Applying message from topic {}. key={} value={}",
conf.getTopicName(),
- msg.getKey(),
- msg.getValue());
+ key,
+ cur);
}
- try {
- listenersMutex.lock();
- if (null == msg.getValue()){
- data.remove(msg.getKey());
- } else {
- data.put(msg.getKey(), msg.getValue());
- }
+ T prev = data.get(key);
+ boolean update = true;
+ if (compactionStrategy != null) {
+ update = compactionStrategy.isValid(prev, cur);
+ }
+
+ if (update) {
+ try {
+ listenersMutex.lock();
+ if (null == cur) {
+ data.remove(key);
+ } else {
+ data.put(key, cur);
+ }
- for (BiConsumer listener : listeners) {
- try {
- listener.accept(msg.getKey(), msg.getValue());
- } catch (Throwable t) {
- log.error("Table view listener raised an exception", t);
+ for (BiConsumer listener : listeners) {
+ try {
+ listener.accept(key, cur);
+ } catch (Throwable t) {
+ log.error("Table view listener raised an exception", t);
+ }
}
+ } finally {
+ listenersMutex.unlock();
}
- } finally {
- listenersMutex.unlock();
}
}
} finally {
@@ -196,6 +219,9 @@ private void readAllExistingMessages(Reader reader, CompletableFuture {
+ logException(
+ String.format("Reader %s was interrupted while reading existing messages",
+ reader.getTopic()), ex);
future.completeExceptionally(ex);
return null;
});
@@ -219,8 +245,18 @@ private void readTailMessages(Reader reader) {
handleMessage(msg);
readTailMessages(reader);
}).exceptionally(ex -> {
- log.info("Reader {} was interrupted", reader.getTopic());
+ logException(
+ String.format("Reader %s was interrupted while reading tail messages.",
+ reader.getTopic()), ex);
return null;
});
}
+
+ private void logException(String msg, Throwable ex) {
+ if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
+ log.warn(msg, ex);
+ } else {
+ log.error(msg, ex);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 42e115ea872fbe..86707d2aa2f0ea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -32,6 +32,8 @@
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderInterceptor;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
@Data
public class ReaderConfigurationData implements Serializable, Cloneable {
@@ -153,6 +155,10 @@ public class ReaderConfigurationData implements Serializable, Cloneable {
private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);
+ private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;
+
+ private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
+
@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
new file mode 100644
index 00000000000000..75f3fd18dcc955
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicCompactionStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.topics;
+
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * Defines a strategy to compact messages in a topic.
+ * This strategy can be passed to Topic Compactor and Table View to select messages in a specific way.
+ *
+ * Examples:
+ *
+ * TopicCompactionStrategy strategy = new MyTopicCompactionStrategy();
+ *
+ * // Run topic compaction by the compaction strategy.
+ * // While compacting messages for each key,
+ * // it will choose messages only if TopicCompactionStrategy.valid(prev, cur) returns true.
+ * StrategicTwoPhaseCompactor compactor = new StrategicTwoPhaseCompactor(...);
+ * compactor.compact(topic, strategy);
+ *
+ * // Run table view by the compaction strategy.
+ * // While updating messages in the table view map,
+ * // it will choose messages only if TopicCompactionStrategy.valid(prev, cur) returns true.
+ * TableView tableView = pulsar.getClient().newTableViewBuilder(strategy.getSchema())
+ * .topic(topic)
+ * .loadConf(Map.of(
+ * "topicCompactionStrategy", strategy.getClass().getCanonicalName()))
+ * .create();
+ */
+public interface TopicCompactionStrategy {
+
+ /**
+ * Returns the schema object for this strategy.
+ * @return
+ */
+ Schema getSchema();
+ /**
+ * Tests if the current message is valid compared to the previous message for the same key.
+ *
+ * @param prev previous message
+ * @param cur current message
+ * @return True if the prev to the cur message transition is valid. Otherwise, false.
+ */
+ boolean isValid(T prev, T cur);
+
+
+ static TopicCompactionStrategy load(String topicCompactionStrategy) {
+ if (topicCompactionStrategy == null) {
+ return null;
+ }
+ try {
+ Class> clazz = Class.forName(topicCompactionStrategy);
+ Object instance = clazz.getDeclaredConstructor().newInstance();
+ return (TopicCompactionStrategy) instance;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Error when loading topic compaction strategy: " + topicCompactionStrategy, e);
+ }
+ }
+}