Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-13-1/3: Provide TopicsConsumer to consume from several topics under same namespace #1103

Merged
merged 6 commits into from
Feb 21, 2018

Conversation

zhaijack
Copy link
Contributor

Motivation

This is a first sub-task for pip-13, which would like handle subscription to topics under same namespace.

Modifications

  • add subscribe methods in PulsarClient and PulsarClientImpl.
  • add TopicsConsumerImpl, TopicMessageImpl, TopicMessageIdImpl, add some test.
  • change parameter in UnAckedMessageTracker from MessageIdImpl to MessageId.

Result

old methods behaviour not changed,
user could use new method to subscribe to topics

@zhaijack zhaijack changed the title Issue 1100: Provide TopicsConsumer to consume from several topics under same namespace Issue 1100: PIP-13-1: Provide TopicsConsumer to consume from several topics under same namespace Jan 25, 2018
@zhaijack
Copy link
Contributor Author

retest this please

@zhaijack zhaijack changed the title Issue 1100: PIP-13-1: Provide TopicsConsumer to consume from several topics under same namespace Issue 1100: PIP-13-1/3: Provide TopicsConsumer to consume from several topics under same namespace Jan 26, 2018
@zhaijack zhaijack force-pushed the sub_regex_topics branch 2 times, most recently from 057e9c0 to f391fe2 Compare January 26, 2018 11:37
@merlimat merlimat added the type/feature The PR added a new feature or issue requested a new feature label Jan 26, 2018
@merlimat merlimat added this to the 2.0.0-incubating milestone Jan 26, 2018
@zhaijack zhaijack force-pushed the sub_regex_topics branch 2 times, most recently from 7a19964 to 43242c7 Compare January 31, 2018 15:58
@zhaijack zhaijack changed the title Issue 1100: PIP-13-1/3: Provide TopicsConsumer to consume from several topics under same namespace PIP-13-1/3: Provide TopicsConsumer to consume from several topics under same namespace Feb 1, 2018
*
* @return the topic name
*/
default String getTopicName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this in both message and messageid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the implemenation needs to know which topic originated the message so that it can "acknowledge" on the right internal consumer, but I think this doesn't need to be exposed in the MessageId interface.

Copy link
Contributor Author

@zhaijack zhaijack Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is as @merlimat metioned. for MessageId, we also need topicName for redeliverUnacknowledgedMessages.
will remove it from MessageId

@@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() {
}

@Override
public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you changing this to MessageId? You're casting to MessageIdImpl anyhow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, As above reply, we also need this redeliverUnacknowledgedMessages method in Consumer.java handling TopicMessageIdImpl. would like to change this in Consumer.java and make the case in each child class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Consumer.java it doesn't know about the MessageIds. The call takes no parameters.

TopicsMessageIdImpl should be a specialization of MessageIdImpl (I assumed it was when I originally commented, until i saw Matteo's comment to the same effect). If it is a specialization of TopicsMessageIdImpl, then this signature doesn't need to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it is in ConsumerBase.java, sorry for the wrong reference of Consumer.java.
TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as the reply below, may be better to implements MessageId, instead of extends MessageIdImpl.

import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;

public class UnAckedMessageTracker implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnackedMessageTracker is a member of ConsumerImpl. Since there is a new ConsumerImpl in this patch, instead of changing the sets to MessageId, you should create a generics parameter on this class, and let the creator of the class decide which message id type to track. This will also get rid of the instanceof stuff further down.

Copy link
Contributor Author

@zhaijack zhaijack Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here.

Copy link
Contributor

@ivankelly ivankelly Feb 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both PartitionedConsumer and TopicsConsumer you know what kind of MessageId you will be need.
For PartitionedConsumer you can create a UnAckedMessageTracker<MessageIdImpl>.
For ConsumerConsumer you can create an UnAckedMessageTracker<MessageIdImpl>.
For TopicsConsumer you can create an UnAckedMessageTracker<TopicMessageIdImpl>.

Any cast, at all, is code smell. Looking at this again, you should also have a specialization of UnAckedMessageTracker, and only this class should implement #removeTopicMessages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will make a class UnAckedTopicMessageTracker extends UnAckedMessageTracker

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github mauled my previous comment. Added some formatting to clarify.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicsConsumerImpl extends ConsumerBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What bits of ConsumerBase are actually useful here? It looks like you've had to reimplement a large amount of it (consumerbase is ~350LOC, this is ~900LOC), while having to stub out all the HandlerBase stuff which doesn't apply in the case of this class.

The majority of ConsumerBase is just default implementations that call other implementation. Perhaps these could be moved up a layer in the class hierarchy so you could avoid pulling in the HandlerBase stuff completely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was an issue opened, which plan to remove it from ConsumerBase.
Since ConsumerBase has did some of the work, would like to leverage it and not to do replicated work.

if (topics.isEmpty()) {
this.namespaceName = null;
setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N consumer? no consumers?

Copy link
Contributor Author

@zhaijack zhaijack Feb 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This is an obsolete comments. will remove it. and Yes. here is no topics created.

// Check topics are valid.
// - each topic is valid,
// - every topic has same namespace.
private static boolean topicsNameInvalid(Collection<String> topics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: topicNamesInvalid makes more sense. topicNamesValid would be even better IMO, but you'd have to invert the booleans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will change the name

}

@Override
protected Message internalReceive() throws PulsarClientException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't this and the next message just call internalReceiveAsync()? It would save a lot of duplicate code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Seems there is some different between sync and async call. and also ComsumerImpl and PartitionedConsumerImpl also doing like this.


final String namespace = DestinationName.get(topics.stream().findFirst().get()).getNamespace();

Optional<String> result = topics.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably check that topic names are unique also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. will add it.

AtomicInteger completed = new AtomicInteger(numberTopicPartitions.get());
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

consumers.values().stream().forEach(consumer -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return CompletaleFuture.allOf(
    consumers.values().stream().map(c -> consumer.closeAsync()))
    .whenComplete((res,ex) -> {
                     unAckedMessageTracker.close();
                     client.cleanupConsumer(TopicsConsumerImpl.this);
                     // fail all pending-receive futures to notify application
                     failPendingReceive();
             })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. will change it.

*/
private void addConsumerForOneTopic(ConsumerImpl consumer,
String topicName,
AtomicInteger numPartitionsToCreate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nicer to replace this atomicInteger + atomicReference pattern, with addConsumerForOneTopic returning a future, and then the caller of addConsumerForOneTopic using CompleteableFuture.allOf to do something when all futures complete and are successful, as in another comment I made above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will change it.


/**
* Get the topic name of this message.
* This is mainly for TopicsConsumerImpl to identify a message belongs to which topic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"TopicsConsumerImpl" we shouldn't be referring to implemenation classes from the API javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. will change it

*
* @return the topic name
*/
default String getTopicName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to use default here, we can always return null in the implementation if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. will change it.

* Get the topic name of this message.
* This is mainly for TopicsConsumerImpl to identify a message belongs to which topic.
*
* @return the topic name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of the topic on which this message was published

*
* @return the topic name
*/
default String getTopicName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that the implemenation needs to know which topic originated the message so that it can "acknowledge" on the right internal consumer, but I think this doesn't need to be exposed in the MessageId interface.

* @return The {@code Consumer} object
* @throws PulsarClientException
*/
Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should be converted into builder API from #1089


import org.apache.pulsar.client.api.MessageId;

public class TopicMessageIdImpl implements MessageId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be extending MessageIdImpl instead of composing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This would reduce the amount of change needed elsewhere also.

Copy link
Contributor Author

@zhaijack zhaijack Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for one internal-sub-consumer.
If extending, the constructor and getInnerMessageIdInner may be not easy to handle.

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;

public class TopicMessageImpl implements Message {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, could we just extend MessageImpl?

Copy link
Contributor Author

@zhaijack zhaijack Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle constructor and get.

protected NamespaceName namespaceName;

// Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
private final ConcurrentHashMap<String, ConsumerImpl> consumers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this logic is very similar to the PartitionedConsumerImpl, we should try to share and reuse as much as possible from that code, since there are a lot of nuances, especially in the consumer part.

Copy link
Contributor Author

@zhaijack zhaijack Feb 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, that is also the reason that this change keep use a lot of same interface.
But it maybe good to make PartitionedConsumerImpl and TopicsConsumerImpl separate at first, this could avoid bring bugs into PartitionedConsumerImpl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 99% of the logic should be unchanged with PartitionedConsumerImpl, the only difference would in constructor (a list of topics vs the number of partitions.. which can be easily be converted in a list of topics anyway), and using the partition index in the message id (which, again, we could use the "partition" topic anyway even in that case).

Copy link
Contributor Author

@zhaijack zhaijack Feb 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the changes for TopicMessageIdImpl, TopicMessageImpl, and UnAckedMessageTracker is aimed to make PartitionedConsumerImpl and TopicsConsumerImpl could be easy merged in the future. opened issue #1236 tracking this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to just rename PartitionedConsumerImpl, change the constructor to accept a list of topics (rather than the number of partitions) and switch to use TopicMessageIdImpl. I don't think that UnAckedMessageTracker usage would need to be changed.

That would avoid to add a lot of new code and having to merge it later.

@zhaijack
Copy link
Contributor Author

opened issue #1237 tracking using builder to create topics consumer.

@zhaijack
Copy link
Contributor Author

@merlimat @ivankelly, Thanks for your comments, Would you please help review it again?

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sijie
Copy link
Member

sijie commented Feb 21, 2018

ping @ivankelly can you take a look at the latest change, so that we can move forward with this feature for 2.0 release.

Copy link
Contributor

@ivankelly ivankelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still too much instanceof and casting going on, but approved to unblock the patchset.

import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;

public class UnAckedMessageTracker implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github mauled my previous comment. Added some formatting to clarify.

@merlimat merlimat merged commit d073a7c into apache:master Feb 21, 2018
@merlimat
Copy link
Contributor

@zhaijack Since the consumer builder changes went in already, please update the API to just be available through the builder.

@zhaijack
Copy link
Contributor Author

@merlimat, Thanks for the reminder, will do the change for builder.

sijie pushed a commit that referenced this pull request Jul 23, 2018
In PR #1103, we add Multi-Topics-Consumer in java client.  This is a catch up work to add it in cpp client.
sijie pushed a commit that referenced this pull request Aug 27, 2018
In PR #1103, we add Multi-Topics-Consumer in java client.  This is a catch up work to add it in cpp client.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants