Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-18099: PIP-215: Configurable TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView #5006

Open
sijie opened this issue Oct 19, 2022 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Oct 19, 2022

Original Issue: apache#18099


Motivation

Currently, the Topic compaction logic implemented in TwoPhaseCompactor only compacts messages to the last one(with the same key).

Here, we want to configure Topic compaction with different strategies. For example, to support the Conflict State Resolution(Race Conditions) in PIP-192 (apache#16691), we need to compact messages with the first valid states.

Goal

  • Create another Topic compactor, StrategicTwoPhaseCompactor, where we can configure a compaction strategy,
    TopicCompactionStrategy

  • Update the TableViewConfigurationData to load and consider the TopicCompactionStrategy when updating the internal key-value map in TableView.

  • Add TopicCompactionStrategy in Topic-level Policy to selectively run StrategicTwoPhaseCompactor or TwoPhaseCompactor when executing compaction.

  • Do not change the default behaviors of topic compaction and table views. Enable this feature only TopicCompactionStrategy is configured .

  • Make a conservative release. Initially use this strategic compaction feature only for the internal system topics. Do not expose until proven to be stable and requested by pulsar users.

API Changes

public interface TopicCompactionStrategy<T> {

    /**
     * Returns the schema object for this strategy.
     * @return
     */
    Schema<T> getSchema();

     /**
     * Tests if the compaction needs to keep the left(previous message value) compared with the right(current message value) for the same key.
     *
     * @param prev previous message value
     * @param cur current message value
     * @return True if it needs to keep the prev and ignore the cur. Otherwise, False.
     */
    boolean shouldKeepLeft(T prev, T cur);

     /**
     * Check if the merge is enabled. If enabled, it will run T merge(..).
     *
     * @return True if the merge is enabled.
     */
    boolean isMergeEnabled();


     /**
     * Merges the previous message value and the cur message value.
     *
     * @param prev previous message value
     * @param cur current message value
     * @return the merged value
     */
    T merge(T prev, T cur);



    static TopicCompactionStrategy load(String topicCompactionStrategy) {
        if (topicCompactionStrategy == null) {
            return null;
        }
        try {
            //
            Class<?> clazz = Class.forName(topicCompactionStrategy);
            Object instance = clazz.getDeclaredConstructor().newInstance();
            return (TopicCompactionStrategy) instance;
        } catch (Exception e) {
            throw new IllegalArgumentException("Error when loading topic compaction strategy.", e);
        }
    }
}
public interface TableView<T> extends Closeable {
    

    /**
     * Performs the give action for each future entry in this map until action throws an exception.
     *
     * @param action The action to be performed for each entry
     */
    void listen(BiConsumer<String, T> action);
public class TableViewConfigurationData implements Serializable, Cloneable {
...
   + private String topicCompactionStrategy;



public class TableViewImpl<T> implements TableView<T> {

    private final TableViewConfigurationData conf;

...

    + private TopicCompactionStrategy<T> compactionStrategy;


    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
...
       + this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
public class ReaderConfigurationData<T> implements Serializable, Cloneable {

    + private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;

    + private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
public class CompactionReaderImpl<T> extends ReaderImpl<T> {

    ConsumerBase<T> consumer;
    private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
                                 ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
                                 Schema<T> schema) {
        super(client, readerConfiguration, executorProvider, consumerFuture, schema);
        this.consumer = getConsumer();
    }

    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
                                                     CompletableFuture<Consumer<T>> consumerFuture) {
        ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
        conf.setTopicName(topic);
        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
        conf.setStartMessageId(MessageId.earliest);
        conf.setAutoUpdatePartitions(true);
        conf.setAutoUpdatePartitionsIntervalSeconds(30);
        conf.setReadCompacted(true);
        conf.setPoolMessages(true);
        conf.setSubscriptionMode(SubscriptionMode.Durable);
        conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
    }


   ...
    @Override
    public CompletableFuture<Message<T>> readNextAsync() {
        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
        return receiveFuture;
    }
...
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
        return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
    }
}

public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
...
    public <T> CompletableFuture<Long> compact(String topic,
                                               TopicCompactionStrategy<T> strategy) {
        CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>();
        CompactionReaderImpl reader = CompactionReaderImpl.create(
                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture);

        return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
    }
...
pulsar-admin topicPolicies set-compaction-strategy options
pulsar-admin topicPolicies get-compaction-strategy options

Implementation

# Goal 1:
- Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can configure a compaction strategy, 
 `TopicCompactionStrategy`

StrategicTwoPhaseCompactor will have two phases.

First Phase:
Using the CompactionReader<T>, instead of RawReader, it will iterate each message and compact messages with the same keys by following the merge() in TopicCompactionStrategy.

The CompactionReader will be added to the pulsar-broker only(not in the pulsar-client).

Second Phase:
The compacted messages will be written to a ledger.

# Goal 2:
- Update the `TableViewConfigurationData` to load and consider the `TopicCompactionStrategy` when updating the internal key-value map in `TableView`. 

When updating the internal key-value map, it will follow the same compaction logic defined in TopicCompactionStrategy .

# Goal 3:
- Add `TopicCompactionStrategy` in Topic-level Policy to run `StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing compaction.

When running the compaction, it will look up the TopicCompactionStrategy in the Topic-level Policy and run StrategicTwoPhaseCompactor, if configured. By default, it should run TwoPhaseCompactor.

Alternatives

Why not resolve conflict by a single broker(leader broker) using two topics : un-compacted and competed(pre-filter)?

  • brokers broadcast messages to the non-compacted topic first.
  • only the leader broker consumes this non-compacted topic and compacts any conflicting messages. Then the leader
    produces compacted messages to the compacted topic(resolve conflicts by the single writer).
  • brokers consume the compacted topic. (no needs to compact the messages separately)

in the worst case, when
there are many conflicting messages, this PIP can incur more repeated
custom compaction than the alternative as individual consumers need to
compact messages (topic compaction and table views). However, one of the
advantages of this proposal is that pub/sub is faster since it uses a
single topic. For example, in PIP-192, if the "bundle assignment" broadcast
is fast enough, conflicting bundle assignment requests can be
reduced(broadcast filter effect).

Anything else?

No response

@sijie sijie added the PIP label Oct 19, 2022
@sijie sijie changed the title ISSUE-18099: PIP-215: Configurable Topic Compaction Strategy ISSUE-18099: PIP-215: Configurable TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView Dec 9, 2022
@sijie sijie added the Stale label Dec 9, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant