diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java index 05e078f66..18322bf68 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java @@ -19,8 +19,8 @@ import org.apache.flink.annotation.Internal; +import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; @@ -130,21 +130,18 @@ public List discoverPartitions() throws WakeupException, Cl // topics or a topic pattern if (topicsDescriptor.isFixedTopics()) { newDiscoveredPartitions = - getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()); + new ArrayList<>( + getAllPartitionsForTopics(topicsDescriptor.getFixedTopics())); } else { - List matchedTopics = getAllTopics(); + List matchedTopics = new ArrayList<>(getAllTopics()); // retain topics that match the pattern - Iterator iter = matchedTopics.iterator(); - while (iter.hasNext()) { - if (!topicsDescriptor.isMatchingTopic(iter.next())) { - iter.remove(); - } - } + matchedTopics.removeIf(s -> !topicsDescriptor.isMatchingTopic(s)); - if (matchedTopics.size() != 0) { + if (!matchedTopics.isEmpty()) { // get partitions only for matched topics - newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics); + newDiscoveredPartitions = + new ArrayList<>(getAllPartitionsForTopics(matchedTopics)); } else { newDiscoveredPartitions = null; } @@ -157,14 +154,8 @@ public List discoverPartitions() throws WakeupException, Cl "Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); } else { - Iterator iter = newDiscoveredPartitions.iterator(); - KafkaTopicPartition nextPartition; - while (iter.hasNext()) { - nextPartition = iter.next(); - if (!setAndCheckDiscoveredPartition(nextPartition)) { - iter.remove(); - } - } + newDiscoveredPartitions.removeIf( + nextPartition -> !setAndCheckDiscoveredPartition(nextPartition)); } return newDiscoveredPartitions;