-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-13-1/3: Provide TopicsConsumer
to consume from several topics under same namespace
#1103
Conversation
TopicsConsumer
to consume from several topics under same namespaceTopicsConsumer
to consume from several topics under same namespace
f294e02
to
52bf92f
Compare
retest this please |
TopicsConsumer
to consume from several topics under same namespaceTopicsConsumer
to consume from several topics under same namespace
057e9c0
to
f391fe2
Compare
7a19964
to
43242c7
Compare
TopicsConsumer
to consume from several topics under same namespaceTopicsConsumer
to consume from several topics under same namespace
2ca75b5
to
af5d6d0
Compare
* | ||
* @return the topic name | ||
*/ | ||
default String getTopicName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this in both message and messageid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that the implemenation needs to know which topic originated the message so that it can "acknowledge" on the right internal consumer, but I think this doesn't need to be exposed in the MessageId
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It is as @merlimat metioned. for MessageId, we also need topicName
for redeliverUnacknowledgedMessages
.
will remove it from MessageId
@@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() { | |||
} | |||
|
|||
@Override | |||
public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) { | |||
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you changing this to MessageId? You're casting to MessageIdImpl anyhow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, As above reply, we also need this redeliverUnacknowledgedMessages method in Consumer.java handling TopicMessageIdImpl. would like to change this in Consumer.java and make the case in each child class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Consumer.java it doesn't know about the MessageIds. The call takes no parameters.
TopicsMessageIdImpl should be a specialization of MessageIdImpl (I assumed it was when I originally commented, until i saw Matteo's comment to the same effect). If it is a specialization of TopicsMessageIdImpl, then this signature doesn't need to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it is in ConsumerBase.java, sorry for the wrong reference of Consumer.java.
TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as the reply below, may be better to implements MessageId, instead of extends MessageIdImpl.
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.netty.util.Timeout; | ||
import io.netty.util.TimerTask; | ||
|
||
public class UnAckedMessageTracker implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnackedMessageTracker is a member of ConsumerImpl. Since there is a new ConsumerImpl in this patch, instead of changing the sets to MessageId, you should create a generics parameter on this class, and let the creator of the class decide which message id type to track. This will also get rid of the instanceof stuff further down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For both PartitionedConsumer and TopicsConsumer you know what kind of MessageId you will be need.
For PartitionedConsumer you can create a UnAckedMessageTracker<MessageIdImpl>
.
For ConsumerConsumer you can create an UnAckedMessageTracker<MessageIdImpl>
.
For TopicsConsumer you can create an UnAckedMessageTracker<TopicMessageIdImpl>
.
Any cast, at all, is code smell. Looking at this again, you should also have a specialization of UnAckedMessageTracker, and only this class should implement #removeTopicMessages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will make a class UnAckedTopicMessageTracker extends UnAckedMessageTracker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github mauled my previous comment. Added some formatting to clarify.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class TopicsConsumerImpl extends ConsumerBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What bits of ConsumerBase are actually useful here? It looks like you've had to reimplement a large amount of it (consumerbase is ~350LOC, this is ~900LOC), while having to stub out all the HandlerBase stuff which doesn't apply in the case of this class.
The majority of ConsumerBase is just default implementations that call other implementation. Perhaps these could be moved up a layer in the class hierarchy so you could avoid pulling in the HandlerBase stuff completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was an issue opened, which plan to remove it from ConsumerBase.
Since ConsumerBase has did some of the work, would like to leverage it and not to do replicated work.
if (topics.isEmpty()) { | ||
this.namespaceName = null; | ||
setState(State.Ready); | ||
// We have successfully created N consumers, so we can start receiving messages now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
N consumer? no consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. This is an obsolete comments. will remove it. and Yes. here is no topics created.
// Check topics are valid. | ||
// - each topic is valid, | ||
// - every topic has same namespace. | ||
private static boolean topicsNameInvalid(Collection<String> topics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: topicNamesInvalid makes more sense. topicNamesValid would be even better IMO, but you'd have to invert the booleans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change the name
} | ||
|
||
@Override | ||
protected Message internalReceive() throws PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't this and the next message just call internalReceiveAsync()? It would save a lot of duplicate code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Seems there is some different between sync and async call. and also ComsumerImpl and PartitionedConsumerImpl also doing like this.
|
||
final String namespace = DestinationName.get(topics.stream().findFirst().get()).getNamespace(); | ||
|
||
Optional<String> result = topics.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably check that topic names are unique also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will add it.
AtomicInteger completed = new AtomicInteger(numberTopicPartitions.get()); | ||
CompletableFuture<Void> closeFuture = new CompletableFuture<>(); | ||
|
||
consumers.values().stream().forEach(consumer -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return CompletaleFuture.allOf(
consumers.values().stream().map(c -> consumer.closeAsync()))
.whenComplete((res,ex) -> {
unAckedMessageTracker.close();
client.cleanupConsumer(TopicsConsumerImpl.this);
// fail all pending-receive futures to notify application
failPendingReceive();
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will change it.
*/ | ||
private void addConsumerForOneTopic(ConsumerImpl consumer, | ||
String topicName, | ||
AtomicInteger numPartitionsToCreate, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nicer to replace this atomicInteger + atomicReference pattern, with addConsumerForOneTopic returning a future, and then the caller of addConsumerForOneTopic using CompleteableFuture.allOf to do something when all futures complete and are successful, as in another comment I made above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will change it.
|
||
/** | ||
* Get the topic name of this message. | ||
* This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"TopicsConsumerImpl" we shouldn't be referring to implemenation classes from the API javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. will change it
* | ||
* @return the topic name | ||
*/ | ||
default String getTopicName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to use default here, we can always return null in the implementation if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. will change it.
* Get the topic name of this message. | ||
* This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. | ||
* | ||
* @return the topic name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name of the topic on which this message was published
* | ||
* @return the topic name | ||
*/ | ||
default String getTopicName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that the implemenation needs to know which topic originated the message so that it can "acknowledge" on the right internal consumer, but I think this doesn't need to be exposed in the MessageId
interface.
* @return The {@code Consumer} object | ||
* @throws PulsarClientException | ||
*/ | ||
Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods should be converted into builder API from #1089
|
||
import org.apache.pulsar.client.api.MessageId; | ||
|
||
public class TopicMessageIdImpl implements MessageId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be extending MessageIdImpl instead of composing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. This would reduce the amount of change needed elsewhere also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for one internal-sub-consumer.
If extending, the constructor and getInnerMessageIdInner may be not easy to handle.
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.MessageId; | ||
|
||
public class TopicMessageImpl implements Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, could we just extend MessageImpl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle constructor and get.
protected NamespaceName namespaceName; | ||
|
||
// Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name | ||
private final ConcurrentHashMap<String, ConsumerImpl> consumers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this logic is very similar to the PartitionedConsumerImpl
, we should try to share and reuse as much as possible from that code, since there are a lot of nuances, especially in the consumer part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, that is also the reason that this change keep use a lot of same interface.
But it maybe good to make PartitionedConsumerImpl and TopicsConsumerImpl separate at first, this could avoid bring bugs into PartitionedConsumerImpl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 99% of the logic should be unchanged with PartitionedConsumerImpl
, the only difference would in constructor (a list of topics vs the number of partitions.. which can be easily be converted in a list of topics anyway), and using the partition index in the message id (which, again, we could use the "partition" topic anyway even in that case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the changes for TopicMessageIdImpl, TopicMessageImpl, and UnAckedMessageTracker is aimed to make PartitionedConsumerImpl and TopicsConsumerImpl could be easy merged in the future. opened issue #1236 tracking this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to just rename PartitionedConsumerImpl
, change the constructor to accept a list of topics (rather than the number of partitions) and switch to use TopicMessageIdImpl
. I don't think that UnAckedMessageTracker
usage would need to be changed.
That would avoid to add a lot of new code and having to merge it later.
opened issue #1237 tracking using builder to create topics consumer. |
26f85e0
to
a43cb0f
Compare
@merlimat @ivankelly, Thanks for your comments, Would you please help review it again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
ping @ivankelly can you take a look at the latest change, so that we can move forward with this feature for 2.0 release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's still too much instanceof and casting going on, but approved to unblock the patchset.
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.netty.util.Timeout; | ||
import io.netty.util.TimerTask; | ||
|
||
public class UnAckedMessageTracker implements Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github mauled my previous comment. Added some formatting to clarify.
@zhaijack Since the consumer builder changes went in already, please update the API to just be available through the builder. |
@merlimat, Thanks for the reminder, will do the change for builder. |
In PR #1103, we add Multi-Topics-Consumer in java client. This is a catch up work to add it in cpp client.
In PR #1103, we add Multi-Topics-Consumer in java client. This is a catch up work to add it in cpp client.
Motivation
This is a first sub-task for pip-13, which would like handle subscription to topics under same namespace.
Modifications
PulsarClient
andPulsarClientImpl
.TopicsConsumerImpl
,TopicMessageImpl
,TopicMessageIdImpl
, add some test.UnAckedMessageTracker
fromMessageIdImpl
toMessageId
.Result
old methods behaviour not changed,
user could use new method to subscribe to topics