Skip to content

Commit

Permalink
[improve][broker] added StrategicTwoPhaseCompactor
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Oct 22, 2022
1 parent 330654a commit ec01d44
Show file tree
Hide file tree
Showing 23 changed files with 1,432 additions and 173 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandAck;

@Slf4j
public class CompactionReaderImpl<T> extends ReaderImpl<T> {

ConsumerBase<T> consumer;

ReaderConfigurationData<T> readerConfiguration;
private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
Schema<T> schema) {
super(client, readerConfiguration, executorProvider, consumerFuture, schema);
this.readerConfiguration = readerConfiguration;
this.consumer = getConsumer();
}

public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
CompletableFuture<Consumer<T>> consumerFuture,
CryptoKeyReader cryptoKeyReader) {
ReaderConfigurationData conf = new ReaderConfigurationData<>();
conf.setTopicName(topic);
conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
conf.setStartMessageId(MessageId.earliest);
conf.setStartMessageFromRollbackDurationInSec(0);
conf.setReadCompacted(true);
conf.setSubscriptionMode(SubscriptionMode.Durable);
conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
conf.setCryptoKeyReader(cryptoKeyReader);
return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
}


@Override
public Message<T> readNext() throws PulsarClientException {
Message<T> msg = consumer.receive();
return msg;
}

@Override
public Message<T> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
Message<T> msg = consumer.receive(timeout, unit);
return msg;
}

@Override
public CompletableFuture<Message<T>> readNextAsync() {
CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
return receiveFuture;
}

public CompletableFuture<MessageId> getLastMessageIdAsync() {
return consumer.getLastMessageIdAsync();
}

public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.compression.CompressionCodecNone;
import org.apache.pulsar.common.protocol.Commands;

public class RawBatchMessageContainerImpl extends BatchMessageContainerImpl {
private MessageIdImpl lastMessageId;
public RawBatchMessageContainerImpl(int maxNumMessagesInBatch) {
super();
compressionType = CompressionType.NONE;
compressor = new CompressionCodecNone();
if (maxNumMessagesInBatch > 0) {
this.maxNumMessagesInBatch = maxNumMessagesInBatch;
}
}

@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
lastMessageId = (MessageIdImpl) msg.getMessageId();
return super.add(msg, callback);
}

@Override
public ProducerImpl.OpSendMsg createOpSendMsg() {
throw new UnsupportedOperationException();
}

public ByteBuf toByteBuf() {
if (numMessagesInBatch > 1) {
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setSequenceId(lowestSequenceId);
messageMetadata.setHighestSequenceId(highestSequenceId);
}

// TODO: Support message encryption like
// producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
// Currently, RawBatchMessageContainerImpl does not encrypt messages.
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, getCompressedBatchMetadataAndPayload());
MessageIdData idData = new MessageIdData();
idData.setLedgerId(lastMessageId.getLedgerId());
idData.setEntryId(lastMessageId.getEntryId());
idData.setPartition(lastMessageId.getPartitionIndex());

// Format: [IdSize][Id][metadataAndPayloadSize][metadataAndPayload]
// Following RawMessage.serialize() format as the compacted messages will be parsed as RawMessage in broker
int idSize = idData.getSerializedSize();
int headerSize = 4 /* IdSize */ + idSize + 4 /* metadataAndPayloadSize */;
int totalSize = headerSize + metadataAndPayload.readableBytes();

ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize);
buf.writeInt(idSize);
idData.writeTo(buf);
buf.writeInt(metadataAndPayload.readableBytes());
buf.writeBytes(metadataAndPayload);
return buf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public abstract class Compactor {

protected final ServiceConfiguration conf;
protected final ScheduledExecutorService scheduler;
private final PulsarClient pulsar;
private final BookKeeper bk;
protected final PulsarClient pulsar;
protected final BookKeeper bk;
protected final CompactorMXBeanImpl mxBean;

public Compactor(ServiceConfiguration conf,
Expand Down
Loading

0 comments on commit ec01d44

Please sign in to comment.