Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][client] PIP-192 PIP-215 Added TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView. #18195

Merged
merged 5 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-messagecrypto-bc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck;

/**
* An extended ReaderImpl used for StrategicTwoPhaseCompactor.
* The compaction consumer subscription is durable and consumes compacted messages from the earliest position.
* It does not acknowledge the message after each read. (needs to call acknowledgeCumulativeAsync to ack messages.)
*/
@Slf4j
public class CompactionReaderImpl<T> extends ReaderImpl<T> {

ConsumerBase<T> consumer;

ReaderConfigurationData<T> readerConfiguration;
private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
Schema<T> schema) {
super(client, readerConfiguration, executorProvider, consumerFuture, schema);
this.readerConfiguration = readerConfiguration;
this.consumer = getConsumer();
}

public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
CompletableFuture<Consumer<T>> consumerFuture,
CryptoKeyReader cryptoKeyReader) {
ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
conf.setTopicName(topic);
conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
conf.setStartMessageId(MessageId.earliest);
conf.setStartMessageFromRollbackDurationInSec(0);
conf.setReadCompacted(true);
conf.setSubscriptionMode(SubscriptionMode.Durable);
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
conf.setCryptoKeyReader(cryptoKeyReader);
return new CompactionReaderImpl<>(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
}


@Override
public Message<T> readNext() throws PulsarClientException {
return consumer.receive();
}

@Override
public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
return consumer.receive(timeout, unit);
}

@Override
public CompletableFuture<Message<T>> readNextAsync() {
return consumer.receiveAsync();
}

public CompletableFuture<MessageId> getLastMessageIdAsync() {
return consumer.getLastMessageIdAsync();
}

public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
return consumer.doAcknowledge(messageId, CommandAck.AckType.Cumulative, properties, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodecNone;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;

/**
* A raw batch message container without producer. (Used for StrategicTwoPhaseCompactor)
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into single batch message:
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this is just a need for a bulk message container based on the maximum number of batches, and I feel that extending the BatchMessageContainerImpl will become too heavy(There are various producer-related properties and operations in the BatchMessageContainer implementation), and it also adds to the complexity of the original BatchMessageContainer.

Maybe a simple batch container could be re-implemented. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to reuse the batching and payload serialization logic from the parent class.

I do not think this PR adds significant complexity to the parent class.
It just checks if the producer is null or not.

We could pass a mock producer if adding the producer null-check is not desirable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I miss some code.

We could pass a mock producer if adding the producer null-check is not desirable.

I'm leaning towards this.

other, I wonder why batch processing is needed here? Is it because it is known that there is a performance bottleneck here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I am not fully convinced this mock producer is better than the null check. Can you explain why this could be better? The base class sets producer later too, it also has potential that producer could be null.

Obviously, We dont want to create ledger entry per message. especially this message payload is very small.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is that developers will miss this later when they modify it. Using mock producer is transfer behavior to RawBatchMessageContainerImpl .

I revisited the code, and the current implementation is not quite capable of the mock producer.

like this:

producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()
+ batchAllocatedSizeBytes);

if (producer != null) {
ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
}

BWT: In the original implementation, BatchContainer coupled with the producer I didn't think was very good. We should make BatchContiner independent, Leave things like releaseMemory to the producer. (This is off topic and does not affect this PR)

MessageCrypto msgCrypto;
Set<String> encryptionKeys;
CryptoKeyReader cryptoKeyReader;
public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
super();
compressionType = CompressionType.NONE;
compressor = new CompressionCodecNone();
if (maxNumMessagesInBatch > 0) {
this.maxNumMessagesInBatch = maxNumMessagesInBatch;
}
}
private ByteBuf encrypt(ByteBuf compressedPayload) {
if (msgCrypto == null) {
return compressedPayload;
}
int maxSize = msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
ByteBuf encryptedPayload = allocator.buffer(maxSize);
ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);

try {
msgCrypto.encrypt(encryptionKeys, cryptoKeyReader, () -> messageMetadata,
compressedPayload.nioBuffer(), targetBuffer);
} catch (PulsarClientException e) {
encryptedPayload.release();
compressedPayload.release();
discard(e);
throw new RuntimeException("Failed to encrypt payload", e);
}
encryptedPayload.writerIndex(targetBuffer.remaining());
compressedPayload.release();
return encryptedPayload;
}

@Override
public ProducerImpl.OpSendMsg createOpSendMsg() {
throw new UnsupportedOperationException();
}

/**
* Sets a CryptoKeyReader instance to encrypt batched messages during serialization, `toByteBuf()`.
* @param cryptoKeyReader a CryptoKeyReader instance
*/
public void setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
this.cryptoKeyReader = cryptoKeyReader;
}

/**
* Serializes the batched messages and return the ByteBuf.
* It sets the CompressionType and Encryption Keys from the batched messages.
* If successful, it calls `clear()` at the end to release buffers from this container.
*
* The returned byte buffer follows this format:
* [IdSize][Id][metadataAndPayloadSize][metadataAndPayload].
* This format is the same as RawMessage.serialize()'s format
* as the compacted messages is deserialized as RawMessage in broker.
*
* It throws the following runtime exceptions from encryption:
* IllegalStateException if cryptoKeyReader is not set for encrypted messages.
* IllegalArgumentException if encryption key init fails.
* RuntimeException if message encryption fails.
*
* @return a ByteBuf instance
*/
public ByteBuf toByteBuf() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some comments to describe which exceptions will throw in this method? Since this method is public, some others develop might use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

I will add the comments when resolving comments from others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

if (numMessagesInBatch > 1) {
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setSequenceId(lowestSequenceId);
messageMetadata.setHighestSequenceId(highestSequenceId);
}
MessageImpl lastMessage = messages.get(messages.size() - 1);
MessageIdImpl lastMessageId = (MessageIdImpl) lastMessage.getMessageId();
MessageMetadata lastMessageMetadata = lastMessage.getMessageBuilder();

this.compressionType = lastMessageMetadata.getCompression();
this.compressor = CompressionCodecProvider.getCompressionCodec(lastMessageMetadata.getCompression());

if (!lastMessage.getEncryptionCtx().isEmpty()) {
EncryptionContext encryptionContext = (EncryptionContext) lastMessage.getEncryptionCtx().get();

if (cryptoKeyReader == null) {
IllegalStateException ex =
new IllegalStateException("Messages are encrypted but no cryptoKeyReader is provided.");
discard(ex);
throw ex;
}

encryptionKeys = encryptionContext.getKeys().keySet();
if (msgCrypto == null) {
msgCrypto =
new MessageCryptoBc(String.format(
"[%s] [%s]", topicName, "RawBatchMessageContainer"), true);
try {
msgCrypto.addPublicKeyCipher(encryptionKeys, cryptoKeyReader);
} catch (PulsarClientException.CryptoException e) {
discard(e);
throw new IllegalArgumentException("Failed to set encryption keys", e);
}
}
}

ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, encryptedPayload);

MessageIdData idData = new MessageIdData();
idData.setLedgerId(lastMessageId.getLedgerId());
idData.setEntryId(lastMessageId.getEntryId());
idData.setPartition(lastMessageId.getPartitionIndex());

// Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
// Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
int idSize = idData.getSerializedSize();
int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
int totalSize = headerSize + metadataAndPayload.readableBytes();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
buf.writeInt(idSize);
idData.writeTo(buf);
buf.writeInt(metadataAndPayload.readableBytes());
buf.writeBytes(metadataAndPayload);
encryptedPayload.release();
clear();
return buf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public abstract class Compactor {

protected final ServiceConfiguration conf;
protected final ScheduledExecutorService scheduler;
private final PulsarClient pulsar;
private final BookKeeper bk;
protected final PulsarClient pulsar;
protected final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;

public Compactor(ServiceConfiguration conf,
Expand Down
Loading