Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Optimize Consumer.individualAck to avoid multiple call pendingAck to find acked consumer. #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -485,14 +486,29 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {

//this method is for individual ack not carry the transaction
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
List<Position> positionsAcked = new ArrayList<>(ack.getMessageIdsCount());
// use for record position and ack consumers
HashMap<PositionImpl, Consumer> ackConsumerMapping = new HashMap<>();

long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;

long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
Consumer ackOwnerConsumer = this;

long batchSize = 1;
if (Subscription.isIndividualAckMode(subType)) {
ConsumerAndLongPair consumerAndLongPair = getAckOwnerConsumerAndLongPair(ledgerId, entryId);
if (consumerAndLongPair.hasResult()) {
ackOwnerConsumer = consumerAndLongPair.consumer;
batchSize = consumerAndLongPair.pendingAckLongPair.first;
}
}

long ackedCount = 0;
long batchSize = getBatchSize(msgId);
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
Expand All @@ -511,12 +527,15 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
if (checkCanRemovePendingAcksAndHandleWithOwnedConsumer(position, msgId, ackOwnerConsumer)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
}

positionsAcked.add(position);
if (position.hasAckSet()) {
ackConsumerMapping.put(position, ackOwnerConsumer);
}

checkAckValidationError(ack, position);

Expand All @@ -532,7 +551,8 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
if (((PositionImpl) position).getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
removePendingAcks((PositionImpl) position);
removePendingAckWithOwnedConsumer((PositionImpl) position,
ackConsumerMapping.get(position));
}
}
}));
Expand Down Expand Up @@ -602,24 +622,6 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
return completableFuture.thenApply(__ -> totalAckCount.sum());
}

private long getBatchSize(MessageIdData msgId) {
long batchSize = 1;
if (Subscription.isIndividualAckMode(subType)) {
LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
// Consumer may ack the msg that not belongs to it.
if (longPair == null) {
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
if (longPair != null) {
batchSize = longPair.first;
}
} else {
batchSize = longPair.first;
}
}
return batchSize;
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
Expand Down Expand Up @@ -686,16 +688,22 @@ private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, Messag
return false;
}

private boolean checkCanRemovePendingAcksAndHandleWithOwnedConsumer(PositionImpl position,
MessageIdData msgId,
Consumer ackOwnerConsumer) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
return removePendingAckWithOwnedConsumer(position, ackOwnerConsumer);
}
return false;
}


private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Consumer ackOwnerConsumer = this;
if (Subscription.isIndividualAckMode(subType)) {
if (!getPendingAcks().containsKey(ledgerId, entryId)) {
for (Consumer consumer : subscription.getConsumers()) {
if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
ackOwnerConsumer = consumer;
break;
}
}
ConsumerAndLongPair ackOwnerConsumerAndLongPair = getAckOwnerConsumerAndLongPair(ledgerId, entryId);
if (ackOwnerConsumerAndLongPair.hasResult()) {
ackOwnerConsumer = ackOwnerConsumerAndLongPair.consumer;
}
}
return ackOwnerConsumer;
Expand Down Expand Up @@ -965,43 +973,63 @@ public int hashCode() {
* @param position
*/
private boolean removePendingAcks(PositionImpl position) {
ConsumerAndLongPair ackOwnerConsumerAndLongPair = getAckOwnerConsumerAndLongPair(position.getLedgerId(),
position.getEntryId());
// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnerConsumerAndLongPair.pendingAckLongPair != null) {
return removePendingAckWithOwnedConsumer(position, ackOwnerConsumerAndLongPair.consumer);
}

return false;
}

record ConsumerAndLongPair(Consumer consumer, LongPair pendingAckLongPair) {
public boolean hasResult() {
return consumer != null && pendingAckLongPair != null;
}
}

private ConsumerAndLongPair getAckOwnerConsumerAndLongPair(long ledgerId, long entryId) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
LongPair ackedPosition = null;

LongPair consumerAckPos = pendingAcks.get(ledgerId, entryId);
if (consumerAckPos == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(position.getLedgerId(),
position.getEntryId())) {
LongPair pos = consumer.getPendingAcks().get(ledgerId, entryId);
if (!consumer.equals(this) && pos != null) {
ackOwnedConsumer = consumer;
ackedPosition = pos;
break;
}
}
} else {
ackOwnedConsumer = this;
ackedPosition = consumerAckPos;
}

// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
LongPair ackedPosition = ackOwnedConsumer != null
? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId())
: null;
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
return new ConsumerAndLongPair(ackOwnedConsumer, ackedPosition);
}

private boolean removePendingAckWithOwnedConsumer(PositionImpl position, Consumer ackOwnedConsumer) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}
return false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}

return true;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down