Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve batch message acking by removing batch message tracker #1424

Merged
merged 6 commits into from
Mar 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand All @@ -42,7 +43,6 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -434,7 +434,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
Message<byte[]> lastunackedMsg = null;
for (int i = 0; i < numMsgs; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
LOG.info("received message {}", String.valueOf(msg.getData()));
LOG.info("received message {}", new String(msg.getData(), UTF_8));
assertNotNull(msg);
if (i == 8) {
consumer.acknowledgeCumulative(msg);
Expand Down Expand Up @@ -514,7 +514,6 @@ public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception {
Thread.sleep(100);
rolloverPerIntervalStats();
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
assertTrue(((ConsumerImpl<byte[]>) consumer).isBatchingAckTrackerEmpty());
consumer.close();
producer.close();
noBatchProducer.close();
Expand Down Expand Up @@ -574,7 +573,6 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception {
}
Thread.sleep(100);
assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
assertTrue(((ConsumerImpl<byte[]>) consumer).isBatchingAckTrackerEmpty());
consumer.close();
producer.close();
noBatchProducer.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.BitSet;

class BatchMessageAcker {

static BatchMessageAcker newAcker(int batchSize) {
BitSet bitSet = new BitSet(batchSize);
bitSet.set(0, batchSize);
return new BatchMessageAcker(bitSet, batchSize);
}

// bitset shared across messages in the same batch.
private final int batchSize;
private final BitSet bitSet;
private boolean prevBatchCumulativelyAcked = false;

BatchMessageAcker(BitSet bitSet, int batchSize) {
this.bitSet = bitSet;
this.batchSize = batchSize;
}

@VisibleForTesting
BitSet getBitSet() {
return bitSet;
}

public synchronized int getBatchSize() {
return batchSize;
}

public synchronized boolean ackIndividual(int batchIndex) {
bitSet.clear(batchIndex);
return bitSet.isEmpty();
}

public synchronized boolean ackCumulative(int batchIndex) {
// +1 since to argument is exclusive
bitSet.clear(0, batchIndex + 1);
return bitSet.isEmpty();
}

// debug purpose
public synchronized int getOutstandingAcks() {
return bitSet.cardinality();
}

public void setPrevBatchCumulativelyAcked(boolean acked) {
this.prevBatchCumulativelyAcked = acked;
}

public boolean isPrevBatchCumulativelyAcked() {
return prevBatchCumulativelyAcked;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

class BatchMessageAckerDisabled extends BatchMessageAcker {

static final BatchMessageAckerDisabled INSTANCE = new BatchMessageAckerDisabled();

private BatchMessageAckerDisabled() {
super(null, 0);
}

@Override
public synchronized int getBatchSize() {
return 0;
}

@Override
public boolean ackIndividual(int batchIndex) {
return true;
}

@Override
public boolean ackCumulative(int batchIndex) {
return true;
}

@Override
public int getOutstandingAcks() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,27 @@ public class BatchMessageIdImpl extends MessageIdImpl {
private final static int NO_BATCH = -1;
private final int batchIndex;

private final BatchMessageAcker acker;

public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex) {
this(ledgerId, entryId, partitionIndex, batchIndex, BatchMessageAckerDisabled.INSTANCE);
}

public BatchMessageIdImpl(long ledgerId, long entryId, int partitionIndex, int batchIndex, BatchMessageAcker acker) {
super(ledgerId, entryId, partitionIndex);
this.batchIndex = batchIndex;
this.acker = acker;
}

public BatchMessageIdImpl(MessageIdImpl other) {
super(other.ledgerId, other.entryId, other.partitionIndex);
if (other instanceof BatchMessageIdImpl) {
this.batchIndex = ((BatchMessageIdImpl) other).batchIndex;
BatchMessageIdImpl otherId = (BatchMessageIdImpl) other;
this.batchIndex = otherId.batchIndex;
this.acker = otherId.acker;
} else {
this.batchIndex = NO_BATCH;
this.acker = BatchMessageAckerDisabled.INSTANCE;
}
}

Expand Down Expand Up @@ -95,4 +105,30 @@ public String toString() {
public byte[] toByteArray() {
return toByteArray(batchIndex);
}

public boolean ackIndividual() {
return acker.ackIndividual(batchIndex);
}

public boolean ackCumulative() {
return acker.ackCumulative(batchIndex);
}

public int getOutstandingAcksInSameBatch() {
return acker.getOutstandingAcks();
}

public int getBatchSize() {
return acker.getBatchSize();
}

public MessageIdImpl prevBatchMessageId() {
return new MessageIdImpl(
ledgerId, entryId - 1, partitionIndex);
}

public BatchMessageAcker getAcker() {
return acker;
}

}
Loading