Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-393: Improve performance of Negative Acknowledgement #23601

Merged
merged 11 commits into from
Dec 5, 2024
152 changes: 152 additions & 0 deletions pip/pip-393.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@

# PIP-393: Improve performance of Negative Acknowledgement

# Background knowledge

Negative Acknowledgement is a feature in Pulsar that allows consumers to trigger the redelivery
of a message after some time when they fail to process it. When user calls `negativeAcknowledge` method,
`NegativeAcksTracker` in `ConsumerImpl` will add an entry into the map `NegativeAcksTracker.nackedMessages`,
mapping the message ID to the redelivery time. When the redelivery time comes, `NegativeAcksTracker` will
send a redelivery request to the broker to redeliver the message.

# Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:
- the memory occupation is high.
- the code execution efficiency is low.
- the redelivery time is not accurate.
- multiple negative ack for messages in the same entry(batch) will interfere with each other.
All of these problem is severe and need to be solved.

## Memory occupation is high
After the improvement of https://github.com/apache/pulsar/pull/23582, we have reduce half more memory occupation
of `NegativeAcksTracker` by replacing `HashMap` with `ConcurrentLongLongPairHashMap`. With 100w entry, the memory
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
occupation decrease from 178Mb to 64Mb. With 1kw entry, the memory occupation decrease from 1132Mb to 512Mb.
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte.

But it is not enough. Assuming that we negative ack message 1w/s, assigning 1h redelivery delay for each message,
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
the memory occupation of `NegativeAcksTracker` will be `3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h,
the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase too fast.

## Code execution efficiency is low
Currently, each time the timer task is triggered, it will iterate all the entries in `NegativeAcksTracker.nackedMessages`,
which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered.

## Redelivery time is not accurate
Currently, the redelivery check time is controlled by the `timerIntervalNanos`, which is 1/3 of the `negativeAckRedeliveryDelay`.
That means, if the `negativeAckRedeliveryDelay` is 1h, check task will be started every 20min, the deviation of the redelivery
time is 20min, which is unacceptable.

## Multiple negative ack for messages in the same entry(batch) will interfere with each other
Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to `timestamp`, which means multiple nacks from messages
in the same batch share single one timestamp.
If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together.
msg1 will not be redelivered 10s later as the timestamp recorded in `NegativeAcksTracker#nackedMessages` is overrode by the second
nack call.


# Goals

Refactor the `NegativeAcksTracker` to solve the above problems.

To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we use a sorted map to store the entries.
thetumbled marked this conversation as resolved.
Show resolved Hide resolved
To reduce memory occupation, we use util class provided by fastutil(https://fastutil.di.unimi.it/docs/), and design
a new algorithm to store the entries, reduce the memory occupation to even 1% less than the current implementation.
(the actual effect rely on the configuration and the throughput).

# Detailed Design

## Design & Implementation Details

### New Data Structure
Use following data structure to store the entries:
```java
Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>();
```
mapping `timestamp -> ledgerId -> entryId`.
We need to sort timestamp in ascending order, so we use a sorted map to map timestamp to `ledgerId -> entryId` map.
As there will many entries in the map, we use `Long2ObjectAVLTreeMap` instead of `Long2ObjectRBTreeMap`.
As for the inner map, we use `Long2ObjectMap` to map `ledgerId` to `entryId` because we don't need to keep the order of `ledgerId`.
`Long2ObjectOpenHashMap` will be satisfied.
All entry id for the same ledger id will be stored in a bit set, as we only care about the existence of the entry id.


### TimeStamp Bucket
Timestamp in ms is used as the key of the map. As most of the use cases don't require that the precision of the delay time is 1ms,
we can make the timestamp bucketed, that is, we can trim the lower bit of the timestamp to map the timestamp to a bucket.
For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000 and 0b1001 will be mapped to the same bucket 0b1000.
Then all messages in the same bucket will be redelivered at the same time.
If user can accept 1024ms deviation of the redelivery time, we can trim the lower 10 bits of the timestamp, which can group a lot
entries into the same bucket and reduce the memory occupation.

following code snippet will be helpful to understand the design:
```java
static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}
```

```java
Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>();
Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>();
LongSet entrySet = new LongOpenHashSet();
entrySet.add(entryId);
ledgerMap.put(ledgerId, entrySet);
map.put(timestamp, ledgerMap);
```

### Effect

#### Memory occupation is high
With such kind of design, we can reduce the memory occupation of `NegativeAcksTracker` to 1% less than the current implementation.
The detailed test result will be provided in the PR.

#### Code execution efficiency is low
With the new design, we can avoid the iteration of all entries in `NegativeAcksTracker.nackedMessages`, and only iterate the entries
that need to be redelivered.

#### Redelivery time is not accurate
With the new design, we avoid the fixed interval of the redelivery check time. We can control the precision of the redelivery time
by trimming the lower bits of the timestamp. If user can accept 1024ms deviation of the redelivery time, we can trim the lower
10 bits of the timestamp, which can group a lot

#### Multiple negative ack for messages in the same entry(batch) will interfere with each other
With the new design, if we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two nacks will not interfere
with each other, as they are stored in different buckets.


### Configuration

Add a new configuration `negativeAckPrecisionBitCnt` to control the precision of the redelivery time.
```
@ApiModelProperty(
name = "negativeAckPrecisionBitCnt",
value = "The redelivery time precision bit count. The lower bits of the redelivery time will be\n" +
"trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time\n" +
"will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later)\n" +
"than the expected time. If the value is 0, the redelivery time will be accurate to ms.".
)
private long negativeAckPrecisionBitCnt = 8;
```
The higher the value, the more entries will be grouped into the same bucket, the less memory occupation, the less accurate the redelivery time.
Default value is 8, which means the redelivery time will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later)
than the expected time.


# Backward & Forward Compatibility

## Upgrade

User can upgrade to the new version without any compatibility issue.

## Downgrade / Rollback

User can downgrade to the old version without any compatibility issue.

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/yojl7ylk7cyjxktq3cn8849hvmyv0fg8
* Mailing List voting thread: