Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-282: Change definition of the recently joined consumers position #20776

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 293 additions & 0 deletions pip/pip-282.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
# Background knowledge
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved

Key_Shared is one of the subscription types which allows multiple consumer connections.
Messages are distributed across consumers, and messages with the same key or same ordering key are delivered to only one consumer.
No matter how many times the message is re-delivered, it is delivered to the same consumer.

When disabling `allowOutOfOrderDelivery`, Key_Shared subscription guarantees a key will be processed in order by a single consumer, even if a new consumer is connected.

# Motivation

Key_Shared has a mechanism called the "recently joined consumers" to keep message ordering.
However, currently, it doesn't care about some corner cases.
More specifically, we found two out-of-order issues cased by:

1. [issue-1] The race condition in the "recently joined consumers", where consumers can be added before finishing reading and dispatching messages from ledgers.
2. [issue-2] Messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement.

We should care about these cases in Key_Shared subscription.

## [issue-1]

Key_Shared subscription has out-of-order cases because of the race condition of [the "recently joined consumers"](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L378-L386).
Consider the following flow.

1. Assume that the current read position is `1:6` and the recently joined consumers is empty.
2. Called [OpReadEntry#internalReadEntriesComplete](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java#L92-L95) from thread-1.
Then, the current read position is updated to `1:12` (Messages from `1:6` to `1:11` have yet to be dispatched to consumers).
3. Called [PersistentStickyKeyDispatcherMultipleConsumers#addConsumer](https://github.com/apache/pulsar/blob/35e9897742b7db4bd29349940075a819b2ad6999/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L130-L139) from thread-2.
Then, the new consumer is stored to the recently joined consumers with read position `1:12`.
4. Called [PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) from thread-5.
Then, messages from `1:6` to `1:11` can be dispatched to the new consumer since the "recently joined consumers" allow brokers to send messages before the joined position (i.e., `1:12` here). **However, it is not expected.**
For example, if existing consumers have some unacked messages, disconnecting, and redelivering them can cause out-of-order.

An example scenario is shown below.

1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`.
- `1:6` key: `key-a`
- `1:7` key: `key-a`
- `1:8` key: `key-a`
- `1:9` key: `key-b`
- `1:10` key: `key-b`
- `1:11` key: `key-b`
2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`.
- So, the current read position is `1:12`.
- `c1` never acknowledge `1:6`.
3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:12}`
4. Send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are less than the recently joined consumers position, `1:12`.
5. Disconnect `c1`.
6. Send `1:6` to `c3`.
As a result `c3` receives messages with the following order: `1:7`, `1:8`, `1:6` // out-of-order

## [issue-2]
Key_Shared subscription has out-of-order cases because messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement.
Consider the following flow.

1. Assume that,
readPosition: `2:1`
messagesToRedeliver: []
recentlyJoinedConsumers: []
c1: messagesForC: 1, pending: []
c2: messagesForC: 1000, pending: [] // Necessary to ensure that the dispatcher reads entries even if c1 has no more permits.
selector: key-a: c1
2. Dispatch `2:1` (key: `key-a`, type: Normal)
readPosition: `2:2`
messagesToRedeliver: []
recentlyJoinedConsumers: []
c1: messagesForC: 0, pending: [`2:1`]
c2: messagesForC: 1000, pending: []
selector: key-a: c1
3. Try to dispatch `2:2` (key: `key-a`, type: Normal), but it can't be sent to c1 because c1 has no more permits. Then, it is added to messagesToRedeliver.
readPosition: `2:3`
messagesToRedeliver: [`2:2`]
recentlyJoinedConsumers: []
c1: messagesForC: 0, pending: [`2:1`]
c2: messagesForC: 1000, pending: []
selector: key-a: c1
4. Add consumer c3
readPosition: `2:3`
messagesToRedeliver: [`2:2`]
recentlyJoinedConsumers: [c3: `2:3`]
c1: messagesForC: 0, pending: [`2:1`]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 1000, pending: []
selector: key-a: c3 // modified
5. Dispatch `2:2` (key: `key-a`, type: Replay) from messagesToRedeliver.
readPosition: `2:3`
messagesToRedeliver: []
recentlyJoinedConsumers: [c3: `2:3`]
c1: messagesForC: 0, pending: [`2:1`]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 999, pending: [`2:2`]
selector: key-a: c3
6. Disconnect c1 and redelivery `2:1`
readPosition: `2:3`
messagesToRedeliver: []
recentlyJoinedConsumers: [c3: `2:3`]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 998, pending: [`2:2`, `2:1`] // out-of-order
selector: key-a: c3

# Goals

## In Scope

Fix out-of-order issues above.

## Out of Scope

Simplify or improve the specification of Key_Shared.

# High Level Design

The root cause of the issues described above is that `recentlyJoinedConsumers` uses "read position" as joined positions for consumers, because this does not guarantee that messages less than or equal to it have already been scheduled to be sent.
Instead, we propose to use "last sent position" as joined positions for consumers.

Also, change (or add) some stats to know Key_Shared subscription status easily.

# Detailed Design

## Design & Implementation Details

First, introduce the new position, like the mark delete position and the individually deleted messages. In other words,

- All positions less than or equal to it are already scheduled to be sent.
- Manage individually sent positions to update the position as expected.

An example of updating the individually sent messages and the last sent position will be as follows.

Initially, the last sent position is `3:0`, and the individually sent positions is `[]`.
1. Read `3:1` - `3:10` positions
2. Send `3:1` - `3:3`, `3:5`, and `3:8` - `3:10` positions
- last sent position: `3:3`
- individually sent positions: `[(3:4, 3:5], (3:7, 3:10]]`
3. Send `3:7` position
- last sent position: `3:3`
- individually sent positions: `[(3:4, 3:5], (3:6, 3:10]]`
4. Send `3:6` position
- last sent position: `3:3`
- individually sent positions: `[(3:4, 3:10]]`
5. Send `3:4` position
- last sent position: `3:10`
- individually sent positions: `[]`

More specifically, the recently joined consumers related fields will be as follows.
```diff
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8f05530f58b..2b17c580832 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -69,8 +69,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
*/
+ // Map(key: recently joined consumer, value: last sent position when joining)
private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;

+ private PositionImpl lastSentPosition;
+ private final RangeSetWrapper<PositionImpl> individuallySentPositions;
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • individuallySentPositions = {Range-Full} - {cursor.individualDeletedMessages} - {dispatcher.redeliveryMessages} - {the positions in inflight Replay Reading}, right?
  • Should we add metrics to describe how much memory individuallySentPositions usage?
  • Should we add a mechanism to stop delivering messages to the client if individuallySentPositions uses more memory than expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz Sorry for the late reply. Could you take a look at these questions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

individuallySentPositions = {Range-Full} - {cursor.individualDeletedMessages} - {dispatcher.redeliveryMessages} - {the positions in inflight Replay Reading}, right?

Roughly, the {cursor.individualDeletedMessages} is not subtracted. It is initialized by individualDeletedMessages. Messages scheduled to be sent are pushed to this field.
After, if the first range is contiguous to the last sent position, remove the first range and update the last sent position to the range's upper bound.
So, if it is not initialized by individualDeletedMessages, then the last sent position can be stuck because of "sent-hole".

More specifically, the details may differ because the definitions are different.

Should we add metrics to describe how much memory individuallySentPositions usage?

I'll add these metrics to the subscription stats.

    /** The last sent position of the cursor. This is for Key_Shared subscription. */
    public String lastSentPosition;

    /** Set of individually sent ranges. This is for Key_Shared subscription. */
    public String individuallySentPositions;

Should we add a mechanism to stop delivering messages to the client if individuallySentPositions uses more memory than expected?

I referred to the definition of the individualDeletedMessages. It has no limitation to persist on the memory(not the storage).
(Of course, we can add the limitation if necessary.)

Copy link
Contributor

@poorbarcode poorbarcode Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz

I'll add these metrics to the subscription stats.

/** The last sent position of the cursor. This is for Key_Shared subscription. */
public String lastSentPosition;

/** Set of individually sent ranges. This is for Key_Shared subscription. */
public String individuallySentPositions;

But individuallySentPositions in topic stats doesn't accurately reflect how much memory individuallySentPositions uses, right?

If there are a huge number of elements in individuallySentPositions, it is possible that pulsar-admin topics stats might not work, because the response body is too large (we have encountered responses close to 200m).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode

But individuallySentPositions in topic stats doesn't accurately reflect how much memory individuallySentPositions uses, right?

Yes.
However, I think we can't do effective operations even if we can observe these metrics. What do you think?

If there are a huge number of elements in individuallySentPositions, it is possible that pulsar-admin topics stats might not work, because the response body is too large (we have encountered responses close to 200m).

Your concerns are correct. I'll reconsider any other approaches(e.g. add a new REST API to expose these stats, just output log as debug level).
BTW, it is not considered in stats(e.g. consumersAfterMarkDeletePosition, keyHashRanges) and stats-internal(e.g. individuallyDeletedMessages). We should care about these stats, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz

it is not considered in stats(e.g. consumersAfterMarkDeletePosition, keyHashRanges) and stats-internal(e.g. individuallyDeletedMessages). We should care about these stats, too.

Yes, I'm planning to do it.

Your concerns are correct. I'll reconsider any other approaches(e.g. add a new REST API to expose these stats, just output log as debug level).

After we added new metrics which indicate how much memory is used by individuallySentPositions. Push an alert when the memory limit is exceeded, so there are no scenarios that the HTTP API cannot handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new PIP to limit the place usage of individuallyDeletedMessages, the PIP will be submitted today

Okay.
If implementation is not far off, I will wait for your new design for reference.
(This PIP issue is one of bug fixes. So, I think we should fix it sooner if possible.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. It is #21118

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode
Sorry for the late reply. I have reconsidered your comments.

This field is appended when messages sent are not contiguous, and is automatically removed when the delivery becomes contiguous.
The frequency of occurrence between instances where messages are not delivered and instances where messages are not acked, would likely differ.
We could add limitations as you mentioned. However, it raises questions about how often this would be effective.

Additionally, introducing such additional stoppage specifications for deliveries would impose more complex limitations on Key_Shared alone.
If these limitations work correctly, that would be ideal. However, past bugs suggest that these complexities often cause omission from consideration.
Hence, if these added limitations do not effectively contribute, it may be better to consider not imposing them in the first place.

+
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
```

Next, rename the consumer stats as follows.
```diff
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -74,8 +74,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;

- /** The read position of the cursor when the consumer joining. */
- public String readPositionWhenJoining;
+ /** The last sent position of the cursor when the consumer joining. */
+ public String lastSentPositionWhenJoining;

/** Address of this consumer. */
private String address;
```

Note that I just renamed the stats from `readPositionWhenJoining` to `lastSentPositionWhenJoining` without keeping the backward-compatibility because readPositionWhenJoining is no longer meaningful and redundant.

And finally, modify the subscription stats of the definition as follows.
```diff
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dc666f3a18e..7591369277f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1177,7 +1177,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
- subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
+ // The dispatcher allows same name consumers
+ final StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("consumerName=").append(k.consumerName())
+ .append(", consumerId=").append(k.consumerId());
+ if (k.cnx() != null) {
+ stringBuilder.append(", address=").append(k.cnx().clientAddress());
+ }
+ subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString());
});
}
}
```

## How The Proposal Resolves The Issue

**[issue-1]**
Consider the following flow.

1. Assume that the [entries](https://github.com/apache/pulsar/blob/e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L169) has the following messages, and the dispatcher has two consumers (`c1` `messagesForC` is 1, `c2` `messageForC` is 1000), and the selector will return `c1` if `key-a` and `c2` if `key-b`.
- `1:6` key: `key-a`
- `1:7` key: `key-a`
- `1:8` key: `key-a`
- `1:9` key: `key-b`
- `1:10` key: `key-b`
- `1:11` key: `key-b`
2. Send `1:6` to `c1` and `1:9` - `1:11` to `c2`.
- So, the current last sent position is `1:6` and the individually sent positions is `[(1:8, 1:11]]`.
- `c1` never acknowledge `1:6`.
3. Add new consumer `c3`, the selector will return `c3` if `key-a`, and the `recentlyJoinedConsumers` is `{c3=1:6}`.
4. Can't send `1:7` - `1:8` to `c3` because `1:7`, and `1:8` are greater than the recently joined consumers position, `1:6`.
5. Disconnect `c1`.
6. Send `1:6` - `1:8` to `c3`.
Now, `c3` receives messages with expected order regarding `key-a`.

**[issue-2]**
This mechanism guarantees all messages less than or equal to the last sent position are already scheduled to be sent. Therefore, skipped messages (e.g. `2:2`) are greater than the last sent position.

1. The last sent position is `2:1`.
2. When add new consumer `c3`, `recentlyJoinedConsumers` is `[{c3: 2:1}]`.
The dispatcher can't send `2:2` to `c3` because `2:2` is greater than the joined position `2:1`.
3. When `c3` receives `2:1` and acknowledges it, then the mark delete position is advanced to `2:1`.
When all messages up to the joined position (i.e., `2:1` ) have been acknowledged, then the consumer (i.e., `c3` ) is removed from `recentlyJoinedConsumers`.
Therefore, `c3` will be able to receive `2:2`.

**[stats]**
`readPositionWhenJoining` is replaced with `lastSentPositionWhenJoining` in each consumer stats instead.

## Public-facing Changes

### Public API

### Binary protocol

### Configuration

### CLI

### Metrics
* The consumer stats `readPositionWhenJoining` is renamed to `lastSentPositionWhenJoining`.
* The subscription stats `consumersAfterMarkDeletePosition` of the definition is modified as described.

# Monitoring

# Security Considerations

# Backward & Forward Compatability

## Revert

## Upgrade

# Alternatives

### Alternative-1
See https://github.com/apache/pulsar/pull/20179 in detail. It isn't merged when publishing this proposal.
The only difference is the message key, i.e., this approach leverages per-key information in addition to the proposal described in this PIP.
For example, the `recentlyJoinedConsumers` will be:

```
// Map(key: recently joined consumer, value: Map(key: message key, value: last sent position in the key when joining))
private final LinkedHashMap<Consumer, Map<ByteBuffer, PositionImpl>> recentlyJoinedConsumers;
```

With this change, message delivery stuck on one key will no longer prevent other keys from being dispatched.
However, the codes will be vulnerable to an increase in keys, causing OOM in the worst case.

### Alternative-2
Make updating the read position, dispatching messages, and adding new consumers exclusive to ensure that messages less than the read position have already been sent.
However, introducing such an exclusion mechanism disrupts the throughput of the dispatcher.

# General Notes

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/69fpb0d30y7pc02k3zvg2lpb2lj0smdg
* Mailing List voting thread: https://lists.apache.org/thread/45x056t8njjnzflbkhkofh00gcy4z5g6