-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2388 [WIP]; refactor KafkaConsumer subscribe API #139
Conversation
@@ -127,6 +127,15 @@ public synchronized void addTopics(String... topics) { | |||
} | |||
|
|||
/** | |||
* Add one or more topics to maintain metadata for | |||
*/ | |||
public synchronized void addTopics(Iterable<String> topics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity: any special reason to use an Iterable
instead of a Collection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not really. I think at first I made it iterable in the hope that I'd be able to delegate addTopics(String...) to this method, but that didn't work. I left it since it didn't seem to make much difference.
kafka-trunk-git-pr #143 SUCCESS |
if (topic.equals(tp.topic())) | ||
clearPartition(tp); | ||
// Remove any assigned partitions which are no longer subscribed to | ||
Iterator<Map.Entry<TopicPartition, TopicPartitionState>> iterator = assignment.entrySet().iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines L#93 to L#98 can be replaced by:
assignment.keySet().retainAll(subscription);
same effect, as we want the intersection of assignment
and subscription
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... This works for the case below, but here we need to pull out the topic from the topic partition to check against the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, you right, my fault. :( I glanced over really quickly, but with the prospect of doing a more comprehensive review/reading (incl. compilation, etc) on Monday. :)
Well, so we can rewrite it as:
for (Iterator<TopicPartition> it = assignment.keySet(); it.hasNext(); ) {
TopicPartition tp = it.next();
if (!subscription.contains(tp.topic()))
it.remove();
}
This simplifies the code a tidy bit, at least. No need to get entrySet
as we are only interested on the keys. ;)
@hachikuji left a few more comments regarding |
@eribeiro Thanks for the review! |
public Set<String> subscription() { | ||
acquire(); | ||
try { | ||
return Collections.unmodifiableSet(this.subscriptions.subscription()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, this is subtle but worth noting, imho. The javadoc states that this method returns a snapshot of the current subscription, an immutable snapshot, by the way.
But by wrapping on Collections.unmodifiableSet
, this method is returning a dynamic unmodifiable view of the underlining Set (subscription). Therefore, when subscription Set underneath changes the Set view returned by this method will change too, so it's not a snapshot anymore. If the goal is to return an immutable snapshot Set then we should return a defensive copy as below:
return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
Then, we are effectively returning a immutable snapshot of the subscriptions (not even the called class can modify it anymore). Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, makes perfect sense. I think we should just return a copy instead of a view into the underlying collection. I don't know if users get any real value from the latter. Seems more likely to cause confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
5dd81ae
to
6303965
Compare
kafka-trunk-git-pr #162 FAILURE |
kafka-trunk-git-pr #163 FAILURE |
* hasn't happened yet, or the partitions are in the process of getting reassigned). | ||
* The set of partitions currently assigned to this consumer. If subscription happened by directly assigning | ||
* partitions using {@link #assign(List)} then this will simply return the same partitions that | ||
* were assigned. If topic subscription was used, then this will give the set of topics currently assigned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment issue: should say "then this will give the set of topic partitions"
kafka-trunk-git-pr #220 SUCCESS |
@@ -48,7 +48,7 @@ | |||
* Here is pseudo-code for a callback implementation for saving offsets: | |||
* <pre> | |||
* {@code | |||
* public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason you want to rename Callback to Listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that Listener makes it clear that it will be invoked multiple times. In the other cases we are using callbacks, they are invoked just once. I don't have a strong preference, however.
LGTM, @hachikuji could you rebase? |
kafka-trunk-git-pr #230 FAILURE |
@guozhangwang The build failure appears to be unrelated. The two test failures are tracked in KAFKA-2442 and KAFKA-2455. |
LGTM. |
This patch refactors KafkaCruiseControl to use MetricSampleAggregator provided by cruise-control-core
TICKET = N/A LI_DESCRIPTION = DEPENG-2065. This is advised by dep engineering EXIT_CRITERIA = When not using JFrog for publishing
Squashed commits: - [LI-HOTFIX] add Bintray support to LinkedIn Kafka Github - [LI-HOTFIX] Migrate bintray publish to JFrog (apache#132) - [LI-HOTFIX] Use JFrog Api key instead of password (apache#139) TICKET = N/A LI_DESCRIPTION = DEPENG-2065. This is advised by dep engineering EXIT_CRITERIA = When not using JFrog for publishing
Squashed commits: - [LI-HOTFIX] add Bintray support to LinkedIn Kafka Github - [LI-HOTFIX] Migrate bintray publish to JFrog (apache#132) - [LI-HOTFIX] Use JFrog Api key instead of password (apache#139) TICKET = N/A LI_DESCRIPTION = DEPENG-2065. This is advised by dep engineering EXIT_CRITERIA = When not using JFrog for publishing
Squashed commits: - [LI-HOTFIX] add Bintray support to LinkedIn Kafka Github - [LI-HOTFIX] Migrate bintray publish to JFrog (apache#132) - [LI-HOTFIX] Use JFrog Api key instead of password (apache#139) TICKET = N/A LI_DESCRIPTION = DEPENG-2065. This is advised by dep engineering EXIT_CRITERIA = When not using JFrog for publishing
No description provided.