-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 937: add CommandGetLastMessageId to make reader know the end of topic #1066
Conversation
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. A few minor comments.
What happens when this is called but nothing has yet been written to the topic?
Also, there's a lot of changes unrelated to the logic changes (such as moving headers, whitespace). It'd be nice for these to be in a separate commit, even if in same pull request.
@@ -323,14 +321,16 @@ | |||
public boolean isTerminated(); | |||
|
|||
/** | |||
* Returns managed-ledger config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated change, adds noise to an already large review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is the auto changing of intellij-idea for checkstyle.
* @param config | ||
*/ | ||
void setConfig(ManagedLedgerConfig config); | ||
|
||
Position getLastConfirmedEntry(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add if needed
|
||
Position position = consumer.getSubscription().getTopic().getLastMessageId(); | ||
|
||
log.info("[{}] [{}][{}] Get LastMessageId {}", remoteAddress, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info is a bit loud for this. debug would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change into debug
|
||
log.info("[{}] [{}][{}] Get LastMessageId {}", remoteAddress, | ||
consumer.getSubscription().getTopic().getName(), consumer.getSubscription().getName(), position); | ||
MessageIdData messageId = MessageIdData.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing partition, which i guess you get from the subscription. How does this deal with batch messages? What if the returned position is a batch message? The message id returned to the client wouldn't actually exist in the stream. Not sure how to deal with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Seems we got here only ledgerid and messageid. will noted in the client api that it only support non-partitioned topic. partition and batch index here should be default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to at least support partition id so that partitions can be compacted (this feature is needed for compaction). You can likely get it from the subscription or topic name. @merlimat could you weight in on this.
You probably can't get batch id because it's embedded in the message, so only the client has access. The only way to get this would be to implement the whole feature as sending the entire last message to the client, which likely isn't sensible. However, this does mean equality and comparisons may be messed up, as the returned last message id may not actually match the message id of a message in the topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noticed this just now, will add partition here, if it is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, maybe it would make more sense to add partition at the client side? From the broker point of view, a partition really just looks like another topic, it has it's own managed ledger. It's only from the client side that partition and batch id are important. (@merlimat @sijie what are your opinions on this)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be at client side, because currently "partition" related logic is at client side.
@@ -903,6 +904,11 @@ public boolean isEncryptionRequired() { | |||
return CompletableFuture.completedFuture(null); | |||
} | |||
|
|||
@Override | |||
public Position getLastMessageId() { | |||
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be noted in the javadoc for the client api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will change it.
@@ -278,4 +277,19 @@ | |||
* @return a future to track the completion of the seek operation | |||
*/ | |||
CompletableFuture<Void> seekAsync(MessageId messageId); | |||
|
|||
/** | |||
* Gets last message id of Topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be more explicit about what the last message id of the topic actually is. It's the message id of the last message for the topic which has been published successfully to the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will add it.
@@ -62,4 +62,6 @@ | |||
* Return true if the topic was terminated and this reader has reached the end of the topic | |||
*/ | |||
boolean hasReachedEndOfTopic(); | |||
|
|||
MessageId getLastMessageId() throws PulsarClientException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async version and javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, will add it.
public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) { | ||
CompletableFuture<MessageIdData> future = new CompletableFuture<>(); | ||
|
||
pendingGetLastMessageIdRequests.put(requestId, future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat how are timeouts handled with Pulsar's RPC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we don't have a general way of handling it.
* | ||
* @return the last message id | ||
*/ | ||
MessageId getLastMessageId() throws PulsarClientException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than exposing this method, I would prefer to have a simpler way to check it, for a couple of reasons:
- Using and comparing message ids can be tricky (was that < or > ? :) )
MessageId lastMessageId = consumer.getLastMessageId();
MessageId currentMessageId = null;
while (currentMessageId == null || currentMessageId.compareTo(lastMessageId) <= 0) {
Message msg = consumer.receive();
/////
consumer.acknowledge(msg);
}
- Most typical code would be like the above snippet and do a tight loop asking the broker for the last message id, without any throttling or flow-control
Rather, I would prefer something like:
while (consumer.hasMessageAvaiable()) {
Message msg = consumer.receive();
// Do something
consumer.acknowledge(msg);
}
// Exits loops when the consumer has caught up with the writer
Internally, the client library can ask the broker for the last message id, and cache it. For example if the lastMessageId=123:12
but the consumer is currently receiving 110:5
, there's no need to keep asking the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm consumer.hasMessageAvaiable()
- I am not sure it is a good idea. However it introduces non-deterministic behaviour to users, where user will have no idea where to stop. but I agree rate-limiting is also important, however should we consider rate-limiting at broker side for this message, rather than do the rate-limiting at client side. In that way, it is a probably a better solution for long term. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the above example, rate limiting in broker will slow down the consumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the comments.
If needed, will add a position var to track the last position inincomingMessages
to achieve consumer.hasMessageAvaiable()
.
8553269
to
9c41a3d
Compare
@ivankelly Thanks. |
@ivankelly @sijie @merlimat Would you please help take a look of the new change? Thanks. |
rebased master to fix conflicts |
/cc @merlimat for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one comment about bumping the protocol version. otherwise, the change looks good to me +1
@@ -415,6 +415,16 @@ message CommandConsumerStatsResponse { | |||
optional uint64 msgBacklog = 15; | |||
} | |||
|
|||
message CommandGetLastMessageId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are introducing a new command, can you bump the "ProtocolVersion"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bumped proto version to v11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaijack The client should check that the broker supported version is >= v11
before sending the command.
If the broker doesn't support the feature, in this case I don't think there's a possible fallback, so we should just throw exception to user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will add the checking
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change look overall good to me. Just left few suggestions.
It would also be good to have C++/Python implementation after this PR is merged so that we have the same feature for Reader
in all langs
/** | ||
* Asynchronously Check if there is message that has been published successfully to the broker in the topic. | ||
*/ | ||
CompletableFuture<Boolean> hasMessageAvailableAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this feature is mostly useful on the Reader
interface. I think we should hide it from here, at least until we can find a concrete use case for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get it. will hide it here.
/** | ||
* Check if there is message that has been published successfully to the broker in the topic. | ||
*/ | ||
Boolean hasMessageAvailable() throws PulsarClientException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean
-> boolean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To minimize the time "blocked", we could include the last-published message id when the broker sends the CommandMessage
so that the client can avoid asking, most of the time, since it will already see that the last was ahead of current positition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. we have keep the value as lastMessageIdInBroker
in ConsumerImpl
, it will return from client most of the time, will not call into broker.
@@ -62,4 +62,14 @@ | |||
* Return true if the topic was terminated and this reader has reached the end of the topic | |||
*/ | |||
boolean hasReachedEndOfTopic(); | |||
|
|||
/** | |||
* Check if there is message that has been published successfully to the broker in the topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Check if there is any message available to read from the current position"
@@ -415,6 +415,16 @@ message CommandConsumerStatsResponse { | |||
optional uint64 msgBacklog = 15; | |||
} | |||
|
|||
message CommandGetLastMessageId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaijack The client should check that the broker supported version is >= v11
before sending the command.
If the broker doesn't support the feature, in this case I don't think there's a possible fallback, so we should just throw exception to user.
private MessageIdImpl() { | ||
this(-1, -1, -1); | ||
// create MessageIdImpl use value of MessageId.earliest | ||
public MessageIdImpl() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the MessageidImpl
is immutable, couldn't we just reference MessageId.earliest
instead of constructing a new one equal to that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change it back, and reference earliest when use this. I thought it may make this clearer than hard coded -1,
public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) { | ||
CompletableFuture<MessageIdData> future = new CompletableFuture<>(); | ||
|
||
pendingGetLastMessageIdRequests.put(requestId, future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we don't have a general way of handling it.
|
||
|
||
@Test | ||
public void testSimpleReaderReachEndofTopic() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One scenario in which we need to pay attention is when batches are used.
With batches, multiple messages are stored in a single BK entry, and the broker treat a batch as a unit, which the consumer will finally break up, presenting the individual messages to application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, will add the test.
@@ -28,8 +31,10 @@ | |||
import java.util.Map; | |||
import java.util.Set; | |||
import java.util.concurrent.TimeUnit; | |||
|
|||
import javax.validation.constraints.AssertFalse; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use testng assertFalse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will change it.
retest this please |
@@ -135,6 +135,7 @@ enum ProtocolVersion { | |||
v9 = 9; // Added end of topic notification | |||
v10 = 10;// Added proxy to broker | |||
v11 = 11;// C++ consumers before this version are not correctly handling the checksum field | |||
//Added get topic's last messageId from broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhaijack There was a merge issue here, v11
was already taken in master and this PR should be adding v12
now.
public CompletableFuture<Boolean> hasMessageAvailableAsync() { | ||
final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>(); | ||
|
||
if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do this check also in the sync hasMessageAvailable()
case, so that we can avoid creating a CompletableFuture
each time we check, if the lastMessageId
is already cached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will add it.
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); | ||
} | ||
|
||
if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not connected cnx()
will return null here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will move the statement of isConnected up.
cnx().getRemoteEndpointProtocolVersion())); | ||
} | ||
|
||
if (!isConnected()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's currently not connected, we should try to mask the exception from the user if the failure is transient.
There is already an operationTimeout
defined in client, it would be good to have a way to retry internally with backoff up to that amount of time.
@zhaijack : can you address @merlimat and @ivankelly 's comments? we need this change for function worker. |
Thanks, have resolved all the comments, and fixed rebase conflict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just one final comment on the retries when disconnected.
} | ||
|
||
if (!isConnected()) { | ||
long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation will block the caller of an asynchrounous method, which might be unexpected. One way to do the retries would be to have an internal method that gets called asynchrounously in recursion. For example, something like :
private CompletableFuture<MessageId> internalGetLastMessageIdAsync(Backoff backoff, long remainingTime) {
if (connected) {
// write on socket and return future
} else {
// if time is not elapsed yet...
long nextDelay = backoff.next();
executor.schedule(() -> {
remainingTime -= (timeSpentSinceLastCall);
internalGetLastMessageIdAsync(backoff, remainingTime);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}
(Don't read too much in the previous example, I'm just trying to illustrate the basic idea)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change this.
} | ||
} | ||
|
||
if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v12.getNumber()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please wrap this in a method like :
void Commands.peerSupportsGetLastMessageId(ClientCnx cnx);
@@ -952,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean authoritative, String orig | |||
lookupBroker.recycle(); | |||
return res; | |||
} | |||
|
|||
public static boolean peerSupportsGetLastMessageId() { | |||
return getCurrentProtocolVersion() >= ProtocolVersion.v12.getNumber(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just checking our own protocol version (which is always the "latest" from when we compiled the protobuf), though we need to check the other side version in this case the broker.
this should be like :
public static boolean peerSupportsGetLastMessageId(ClientCnx cnx) {
return ctx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v12.getNumber();
}
@@ -145,6 +151,9 @@ | |||
this.batchMessageAckTracker = new ConcurrentSkipListMap<>(); | |||
this.readCompacted = conf.getReadCompacted(); | |||
|
|||
this.getLastIdExecutor = Executors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would create 1 thread per each consumer, we should reuse the executor that is already available from PulsarClientImpl
private void internalGetLastMessageIdAsync(final Backoff backoff, | ||
final AtomicLong remainingTime, | ||
CompletableFuture<MessageId> future) { | ||
if (isConnected()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this before, but we should make sure the connection doesn't change while we're executing this method.
isConnected()
is checking the current connection, but that might change when we ask for cnx()
few lines below.
We need to first get a reference on ClientCnx
and use that throughout the method.
ClientCnx cnx = cnx();
if (cnx != null) {
// check cnx.getRemoteEndpointProtocolVersion();
cnx.sendGetLastMessageId()...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This is the first version to collect suggestion to achieve it.
Motivation
We have recently introduced the concept of topic "Reader" as an alternative to the traditional pub-sub consumer abstraction.
A common followup request has been to have a way to identify when the reader has reached the last published entry on the topic.
There is no currently direct way to achieve that and using readNext(timeout) doesn't help because in case the client is not connected to broker, it doesn't mean that there are no more messages to read.
There are a few workaround that are not easy or desirable (eg: terminating the topic, or using HTTP admin API to check the backlog on the reader).
Since this is a common theme, we should have a good way to handle this.
Modifications
Result
A new command CommandGetLastMessageId added in binary protocol