Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 937: add CommandGetLastMessageId to make reader know the end of topic #1066

Merged
merged 10 commits into from
Feb 14, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,11 @@ public interface ManagedLedger {
* @param config
*/
void setConfig(ManagedLedgerConfig config);

/**
* Gets last confirmed entry of the managed ledger.
*
* @return the last confirmed entry id
*/
Position getLastConfirmedEntry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {
Futures.waitForAll(futures).thenRun(() -> {
callback.closeComplete(ctx);
}).exceptionally(exception -> {
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -1282,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
return null;
});
}
Expand Down Expand Up @@ -1351,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct
entryCache.asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -2173,7 +2173,8 @@ public int getPendingAddEntriesCount() {
return pendingAddEntries.size();
}

public PositionImpl getLastConfirmedEntry() {
@Override
public Position getLastConfirmedEntry() {
return lastConfirmedEntry;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;

import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
Expand All @@ -59,6 +57,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
Expand Down Expand Up @@ -110,7 +109,7 @@ public class ServerCnx extends PulsarHandler {
private String originalPrincipal = null;
private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;

enum State {
Start, Connected, Failed
}
Expand Down Expand Up @@ -192,8 +191,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

/*
* If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce
* - the originalPrincipal is given while connecting
* - originalPrincipal is not blank
* - originalPrincipal is not a proxy principal
*/
Expand All @@ -218,7 +217,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (topicName == null) {
return;
}

String originalPrincipal = null;
if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
originalPrincipal = validateOriginalPrincipal(
Expand All @@ -233,9 +232,9 @@ protected void handleLookup(CommandLookupTopic lookup) {
} else {
originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
Expand Down Expand Up @@ -319,7 +318,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
} else {
originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
Expand Down Expand Up @@ -441,7 +440,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum

return commandConsumerStatsResponseBuilder;
}

private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
Expand All @@ -461,7 +460,7 @@ private String validateOriginalPrincipal(String originalAuthData, String origina
return null;
}
}

private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
SSLSession sslSession) throws AuthenticationException {
if (authenticateOriginalAuthData) {
Expand Down Expand Up @@ -532,7 +531,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
if (topicName == null) {
return;
}
}

if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided while subscribing ";
Expand Down Expand Up @@ -1104,6 +1103,35 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
}
}

@Override
protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
checkArgument(state == State.Connected);

CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId());

if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
long requestId = getLastMessageId.getRequestId();

Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastMessageId();
int partitionIndex = DestinationName.getPartitionIndex(topic.getName());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), consumer.getSubscription().getName(), position, partitionIndex);
}
MessageIdData messageId = MessageIdData.newBuilder()
.setLedgerId(((PositionImpl)position).getLedgerId())
.setEntryId(((PositionImpl)position).getEntryId())
.setPartition(partitionIndex)
.build();

ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
} else {
ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
}
}

@Override
protected boolean isHandshakeCompleted() {
return state == State.Connected;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -122,4 +123,6 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
PersistentTopicStats getStats();

PersistentTopicInternalStats getInternalStats();

Position getLastMessageId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -909,6 +910,11 @@ public CompletableFuture<Void> unsubscribe(String subName) {
return CompletableFuture.completedFuture(null);
}

@Override
public Position getLastMessageId() {
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be noted in the javadoc for the client api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. will change it.

}

public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1536,5 +1536,10 @@ public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}

@Override
public Position getLastMessageId() {
return ledger.getLastConfirmedEntry();
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -28,8 +32,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,8 +42,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Sets;

public class TopicReaderTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

Expand Down Expand Up @@ -359,4 +361,103 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
reader.close();
log.info("-- Exiting {} test --", methodName);
}


@Test
public void testSimpleReaderReachEndofTopic() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario in which we need to pay attention is when batches are used.

With batches, multiple messages are stored in a single BK entry, and the broker treat a batch as a unit, which the consumer will finally break up, presenting the individual messages to application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will add the test.

ReaderConfiguration conf = new ReaderConfiguration();
Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest,
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);

// no data write, should return false
assertFalse(reader.hasMessageAvailable());

// produce message 0 -- 99
for (int i = 0; i < 100; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

MessageImpl msg = null;
Set<String> messageSet = Sets.newHashSet();
int index = 0;

// read message till end.
while (reader.hasMessageAvailable()) {
msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + (index ++);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}

assertEquals(index, 100);
// readNext should return null, after reach the end of topic.
assertNull(reader.readNext(1, TimeUnit.SECONDS));

// produce message again.
for (int i = 100; i < 200; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

// read message till end again.
while (reader.hasMessageAvailable()) {
msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + (index ++);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}

assertEquals(index, 200);
// readNext should return null, after reach the end of topic.
assertNull(reader.readNext(1, TimeUnit.SECONDS));

producer.close();
}

@Test
public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception {
Reader reader = pulsarClient.createReader(
"persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", MessageId.earliest,
new ReaderConfiguration());

ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS);
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", producerConf);

// no data write, should return false
assertFalse(reader.hasMessageAvailable());

for (int i = 0; i < 100; i++) {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}

// Write one sync message to ensure everything before got persistend
producer.send("my-message-10".getBytes());

MessageId lastMessageId = null;
int index = 0;
assertTrue(reader.hasMessageAvailable());

if (reader.hasMessageAvailable()) {
Message msg = reader.readNext();
lastMessageId = msg.getMessageId();
assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class);

while (msg != null) {
index++;
msg = reader.readNext(100, TimeUnit.MILLISECONDS);
}
assertEquals(index, 101);
}

assertFalse(reader.hasMessageAvailable());
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,14 @@ public interface Reader extends Closeable {
* Return true if the topic was terminated and this reader has reached the end of the topic
*/
boolean hasReachedEndOfTopic();

/**
* Check if there is any message available to read from the current position.
*/
boolean hasMessageAvailable() throws PulsarClientException;

/**
* Asynchronously Check if there is message that has been published successfully to the broker in the topic.
*/
CompletableFuture<Boolean> hasMessageAvailableAsync();
}
Loading