Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-299: Stop dispatch messages if the individual acks will be lost in the persistent storage #21118

Merged
merged 24 commits into from
Oct 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions pip/pip-299.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Background knowledge

The config `managedLedgerMaxUnackedRangesToPersist`

Indicates the number of `acknowledgment holes` that are going to be persistently stored. When acknowledging out of order, a consumer will leave holes that are supposed to be quickly filled by acking all the messages. The information of which messages are acknowledged is persisted by compressing in `ranges` of messages that were acknowledged. After the maximum number of ranges is reached, the information will only be tracked in memory, and messages will be redelivered in case of crashes.

The cursor metadata contains the following three data:
- Subscription properties(Usually, this is small); this data part only persists to the ZK node.
- The last sequence ID of each producer. It only exists for the cursor `pulsar.dedup`. If a topic has many, many producers, this part of the data will be large. See [PIP-6:-Guaranteed-Message-Deduplication](https://github.com/apache/pulsar/wiki/PIP-6:-Guaranteed-Message-Deduplication) for more details.
- Individual Deleted Messages(including the acknowledgment of batched messages). This part of the data occupies most of the cursor metadata's space, which is the focus of this proposal.

Differ with Kafka: Pulsar supports [individual acknowledgment](https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#acknowledgment) (just like ack `{pos-1, pos-3, pos-5}`), so instead of a pointer(acknowledged on the left and un-acknowledged on the right), Pulsar needs to persist the acknowledgment state of each message, we call these records `Individual Deleted Messages.`

The current persistence mechanism of the cursor metadata(including `Individual Deleted Messages`) works like this:
1. Write the data of cursor metadata(including `Individual Deleted Messages`) to BK in one Entry; by default, the maximum size of the Entry is 5MB.
2. Write the data of cursor metadata(optional to include `Individual Deleted Messages`) to the Metadata Store(such as ZK) if BK-Write fails; data of a Metadata Store Node that is less than 10MB is recommended. Since writing large chunks of data to the Metadata Store frequently makes the Metadata Store work unstable, this is only a backstop measure.

Is 5MB enough? `Individual Deleted Messages` consists of Position_Rang(each Position_Rang occupies 32 bytes; the implementation will not be explained in this proposal). This means that the Broker can persist `5m / 32bytes` number of Position_Rang for each Subscription, and there is an additional compression mechanism at work, so it is sufficient for almost all scenarios except the following three scenarios:
- Client Miss Acknowledges: Clients receive many messages, and ack some of them, the rest still need to be acknowledged due to errors or other reasons. As time goes on, more and more records will be staying there.
- Delay Messages: Long-delayed and short-delayed messages are mixed, with only the short-delayed message successfully consumed and the long-delayed message not delivered. As time goes on, more and more records will be staying there.
- Large Number of Consumers: If the number of consumers is large and each has some discrete ack records, all add up to a large number.
- Large Number of Producers: If the number of producers is large, there might be a large data of Last Sequence ID to persist. This scenario only exists on the `pulsar.dedup` cursor.

The config `managedLedgerMaxUnackedRangesToPersist`
If the cursor metadata is too large to persist, the Broker will persist only part of the data according to the following priorities.
- Subscription Properties. This part can't be split up; persist will fail if this part is too large to persist.
- Last sequence ID of producers. This part can't be split up; persist will fail if this part is too large to persist.
- Individual Deleted Message. If it is too large, only one part persists, and then the other part is maintained only in memory

# Motivation

Since the frequent persistence of `Individual Deleted Messages` will magnify the amount of BK Written and increase the latency of ack-response, the Broker does not immediately persist it when receiving a consumer's acknowledgment but persists it regularly.

The data of cursor metadata is recommended to be less than 5MB; if a subscription's `Individual Deleted Messages` data is too large to persist, as the program grows for a long time, there will be more and more non-persistent data. Eventually, there will be an unacceptable amount of repeated consumption of messages when the Broker restarts.

# Goal

## In Scope

To avoid repeated consumption due to the cursor metadata being too large to persist.

## Out of Scope

This proposal will not care about this scenario: if so many producers make the metadata of cursor `pulsar.dedup` cannot persist, the task `Take Deduplication Snapshot` will be in vain due to the inability to persist.

# High-Level Design

Provide a new config named `dispatcherPauseOnAckStatePersistentEnabled`(default value is `false`) for a new feature: stop dispatch messages to clients when reaching the limitation `managedLedgerMaxUnackedRangesToPersist`.
- If the user does not care about that Individual Deleted Messages can not be fully persistent, resulting in a large number of repeated message consumption, then it can be set to `false`.
- If the user cares about repeated consumption, at can accept a decline in consumption speed when cursor metadata is too large to persist, it can be set to `true`.


# Detailed Design
### Public API

**broker.conf**
```
/**
* After enabling this feature, Pulsar will stop delivery messages to clients if the cursor metadata is too large to persist, it will help to reduce the duplicates caused by the ack state that can not be fully persistent. Default "false".
*/
boolean dispatcherPauseOnAckStatePersistentEnabled;
```

**SubscriptionStats**
```java
/**
* After enabling the feature "dispatcherPauseOnAckStatePersistentEnabled", return "true" if the cursor metadata is too large to persist, else return "false".
* Always return "false" if disabled the feature "dispatcherPauseOnAckStatePersistentEnabled".
*/
boolean isBlockedOnAckStatePersistent();
```

## Design & Implementation Details

Cache the range count of the Individual Deleted Messages in the memory when doing persist cursor metadata to BK. Stuck delivery messages to clients if reaching the limitation `managedLedgerMaxUnackedRangesToPersist`.

Since the cache will not be updated in time, the actual count will decrease when clients acknowledge messages(but it does not persist immediately, so the cached value does not update immediately), the cached value is an estimated value.

# Metrics & Alert

Nothing.