Skip to content

Commit

Permalink
[improve][broker] added StrategicTwoPhaseCompactor, added listen() in…
Browse files Browse the repository at this point in the history
… TableView
  • Loading branch information
heesung-sn committed Oct 26, 2022
1 parent 29461bd commit 41d03d6
Show file tree
Hide file tree
Showing 26 changed files with 1,797 additions and 176 deletions.
1 change: 0 additions & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck;

/**
* An extended ReaderImpl used for StrategicTwoPhaseCompactor.
* The compaction consumer subscription is durable and consumes compacted messages from the earliest position.
* It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.)
*/
@Slf4j
public class CompactionReaderImpl<T> extends ReaderImpl<T> {

ConsumerBase<T> consumer;

ReaderConfigurationData<T> readerConfiguration;
private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
Schema<T> schema) {
super(client, readerConfiguration, executorProvider, consumerFuture, schema);
this.readerConfiguration = readerConfiguration;
this.consumer = getConsumer();
}

public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
CompletableFuture<Consumer<T>> consumerFuture,
CryptoKeyReader cryptoKeyReader) {
ReaderConfigurationData conf = new ReaderConfigurationData<>();
conf.setTopicName(topic);
conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
conf.setStartMessageId(MessageId.earliest);
conf.setStartMessageFromRollbackDurationInSec(0);
conf.setReadCompacted(true);
conf.setSubscriptionMode(SubscriptionMode.Durable);
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
conf.setCryptoKeyReader(cryptoKeyReader);
return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
}


@Override
public Message<T> readNext() throws PulsarClientException {
return consumer.receive();
}

@Override
public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
return consumer.receive(timeout, unit);
}

@Override
public CompletableFuture<Message<T>> readNextAsync() {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
return receiveFuture;
}

public CompletableFuture<MessageId> getLastMessageIdAsync() {
return consumer.getLastMessageIdAsync();
}

public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecNone;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;

/**
* A raw batch message container without producer. (Used for StrategicTwoPhaseCompactor)
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into single batch message:
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
MessageCrypto msgCrypto;
Set<String> encryptionKeys;
CryptoKeyReader cryptoKeyReader;
public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
super();
compressionType = CompressionType.NONE;
compressor = new CompressionCodecNone();
if (maxNumMessagesInBatch > 0) {
this.maxNumMessagesInBatch = maxNumMessagesInBatch;
}
}
private ByteBuf encrypt(ByteBuf compressedPayload) {
if (msgCrypto == null) {
return compressedPayload;
}
int maxSize = msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
ByteBuf encryptedPayload = allocator.buffer(maxSize);
ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);

try {
msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
compressedPayload.nioBuffer(), targetBuffer);
} catch (PulsarClientException e) {
throw new RuntimeException("Failed to encrypt payload", e);
}
encryptedPayload.writerIndex(targetBuffer.remaining());
compressedPayload.release();
return encryptedPayload;
}

@Override
public ProducerImpl.OpSendMsg createOpSendMsg() {
throw new UnsupportedOperationException();
}
public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}

public ByteBuf toByteBuf() {
if (numMessagesInBatch > 1) {
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setSequenceId(lowestSequenceId);
messageMetadata.setHighestSequenceId(highestSequenceId);
}
MessageImpl lastMessage = messages.get(messages.size() - 1);
MessageIdImpl lastMessageId = (MessageIdImpl) lastMessage.getMessageId();
MessageMetadata lastMessageMetadata = lastMessage.getMessageBuilder();

this.compressionType = lastMessageMetadata.getCompression();
this.compressor = CompressionCodecProvider.getCompressionCodec(lastMessageMetadata.getCompression());

if (!lastMessage.getEncryptionCtx().isEmpty()) {
EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();

if (cryptoKeyReader == null) {
throw new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
}

this.encryptionKeys = encryptionContext.getKeys().keySet();
this.msgCrypto =
new MessageCryptoBc(String.format(
"[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
try {
msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
} catch (PulsarClientException.CryptoException e) {
throw new IllegalArgumentException("Failed to set encryption keys", e);
}
}

ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, encryptedPayload);

MessageIdData idData = new MessageIdData();
idData.setLedgerId(lastMessageId.getLedgerId());
idData.setEntryId(lastMessageId.getEntryId());
idData.setPartition(lastMessageId.getPartitionIndex());

// Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
// Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
int idSize = idData.getSerializedSize();
int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
int totalSize = headerSize + metadataAndPayload.readableBytes();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
buf.writeInt(idSize);
idData.writeTo(buf);
buf.writeInt(metadataAndPayload.readableBytes());
buf.writeBytes(metadataAndPayload);
encryptedPayload.release();
clear();
return buf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public abstract class Compactor {

protected final ServiceConfiguration conf;
protected final ScheduledExecutorService scheduler;
private final PulsarClient pulsar;
private final BookKeeper bk;
protected final PulsarClient pulsar;
protected final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;

public Compactor(ServiceConfiguration conf,
Expand Down
Loading

0 comments on commit 41d03d6

Please sign in to comment.