Skip to content

Commit

Permalink
MINOR: Various cleanups in clients (apache#16193)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Jun 6, 2024
1 parent a41f7a4 commit 79ea7d6
Show file tree
Hide file tree
Showing 96 changed files with 264 additions and 374 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static List<InetAddress> resolve(String host, HostResolver hostResolver) throws
InetAddress[] addresses = hostResolver.resolve(host);
List<InetAddress> result = filterPreferredAddresses(addresses);
if (log.isDebugEnabled())
log.debug("Resolved host {} as {}", host, result.stream().map(i -> i.getHostAddress()).collect(Collectors.joining(",")));
log.debug("Resolved host {} as {}", host, result.stream().map(InetAddress::getHostAddress).collect(Collectors.joining(",")));
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public Iterable<NetworkClient.InFlightRequest> clearAll(String node) {
} else {
final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
inFlightRequestCount.getAndAdd(-clearedRequests.size());
return () -> clearedRequests.descendingIterator();
return clearedRequests::descendingIterator;
}
}

Expand Down
4 changes: 2 additions & 2 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
// Get topic-ids for updated topics from existing topic-ids.
Map<String, Uuid> existingTopicIds = this.metadataSnapshot.topicIds();
Map<String, Uuid> topicIdsForUpdatedTopics = updatedTopics.stream()
.filter(e -> existingTopicIds.containsKey(e))
.collect(Collectors.toMap(e -> e, e -> existingTopicIds.get(e)));
.filter(existingTopicIds::containsKey)
.collect(Collectors.toMap(e -> e, existingTopicIds::get));

if (log.isDebugEnabled()) {
updatePartitionMetadata.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupD
* Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() {
Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = new HashMap<>();
futures.forEach((key, future) -> describedGroups.put(key, future));
return describedGroups;
return new HashMap<>(futures);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class DescribeDelegationTokenOptions extends AbstractOptions<DescribeDele
private List<KafkaPrincipal> owners;

/**
* if owners is null, all the user owned tokens and tokens where user have Describe permission
* If owners is null, all the user owned tokens and tokens where user have Describe permission
* will be returned.
* @param owners
* @param owners The owners that we want to describe delegation tokens for
* @return this instance
*/
public DescribeDelegationTokenOptions owners(List<KafkaPrincipal> owners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Map<Integer, KafkaFuture<Map<String, DescribeLogDirsResponse.LogDirInfo>>
return descriptions().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().thenApply(map -> convertMapValues(map))));
entry -> entry.getValue().thenApply(this::convertMapValues)));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -88,7 +88,7 @@ public Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions()
@Deprecated
public KafkaFuture<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> all() {
return allDescriptions().thenApply(map -> map.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey(),
Map.Entry::getKey,
entry -> convertMapValues(entry.getValue())
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,18 @@ public Map<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> values() {
* Return a future which succeeds if log directory information of all replicas are available
*/
public KafkaFuture<Map<TopicPartitionReplica, ReplicaLogDirInfo>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
thenApply(new KafkaFuture.BaseFunction<Void, Map<TopicPartitionReplica, ReplicaLogDirInfo>>() {
@Override
public Map<TopicPartitionReplica, ReplicaLogDirInfo> apply(Void v) {
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
try {
replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
}
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
.thenApply(v -> {
Map<TopicPartitionReplica, ReplicaLogDirInfo> replicaLogDirInfos = new HashMap<>();
for (Map.Entry<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>> entry : futures.entrySet()) {
try {
replicaLogDirInfos.put(entry.getKey(), entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, because allOf ensured that all the futures completed successfully.
throw new RuntimeException(e);
}
return replicaLogDirInfos;
}
return replicaLogDirInfos;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2200,9 +2200,9 @@ Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
long now
) {
final Map<String, TopicRequest> topicsRequests = new LinkedHashMap<>();
topicNamesList.stream().sorted().forEach(topic -> {
topicsRequests.put(topic, new TopicRequest().setName(topic));
});
topicNamesList.stream().sorted().forEach(topic ->
topicsRequests.put(topic, new TopicRequest().setName(topic))
);
return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
TopicDescription partiallyFinishedTopicDescription = null;
Expand Down Expand Up @@ -3048,7 +3048,7 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) {
public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
Map<String, LogDirDescription> descriptions = logDirDescriptions(response);
if (descriptions.size() > 0) {
if (!descriptions.isEmpty()) {
future.complete(descriptions);
} else {
// Up to v3 DescribeLogDirsResponse did not have an error code field, hence it defaults to None
Expand Down Expand Up @@ -3560,10 +3560,10 @@ private void maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) {
String protocolType = group.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final String groupId = group.groupId();
final Optional<ConsumerGroupState> state = group.groupState().equals("")
final Optional<ConsumerGroupState> state = group.groupState().isEmpty()
? Optional.empty()
: Optional.of(ConsumerGroupState.parse(group.groupState()));
final Optional<GroupType> type = group.groupType().equals("")
final Optional<GroupType> type = group.groupType().isEmpty()
? Optional.empty()
: Optional.of(GroupType.parse(group.groupType()));
final ConsumerGroupListing groupListing = new ConsumerGroupListing(
Expand Down Expand Up @@ -4215,9 +4215,9 @@ public void handleResponse(AbstractResponse abstractResponse) {
* Be sure to do this after the NOT_CONTROLLER error check above
* so that all errors are consistent in that case.
*/
userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> {
futures.get(entry.getKey()).completeExceptionally(entry.getValue());
});
userIllegalAlterationExceptions.entrySet().stream().forEach(entry ->
futures.get(entry.getKey()).completeExceptionally(entry.getValue())
);
response.data().results().forEach(result -> {
KafkaFutureImpl<Void> future = futures.get(result.user());
if (future == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public KafkaFuture<Map<String, TopicListing>> namesToListings() {
* Return a future which yields a collection of TopicListing objects.
*/
public KafkaFuture<Collection<TopicListing>> listings() {
return future.thenApply(namesToDescriptions -> namesToDescriptions.values());
return future.thenApply(Map::values);
}

/**
* Return a future which yields a collection of topic names.
*/
public KafkaFuture<Set<String>> names() {
return future.thenApply(namesToListings -> namesToListings.keySet());
return future.thenApply(Map::keySet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ListTransactionsOptions filterOnDuration(long durationMs) {
/**
* Returns the set of states to be filtered or empty if no states have been specified.
*
* @return the current set of filtered states (empty means that no states are filtered and all
* @return the current set of filtered states (empty means that no states are filtered and
* all transactions will be returned)
*/
public Set<TransactionState> filteredStates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public KafkaFuture<Map<Integer, Collection<TransactionListing>>> allByBrokerId()
}

Set<Integer> remainingResponses = new HashSet<>(map.keySet());
map.forEach((brokerId, future) -> {
map.forEach((brokerId, future) ->
future.whenComplete((listings, brokerException) -> {
if (brokerException != null) {
allFuture.completeExceptionally(brokerException);
Expand All @@ -115,8 +115,8 @@ public KafkaFuture<Map<Integer, Collection<TransactionListing>>> allByBrokerId()
allFuture.complete(allListingsMap);
}
}
});
});
})
);
});

return allFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,16 @@ private ApiResult<CoordinatorKey, ConsumerGroupDescription> handledConsumerGroup
final Set<AclOperation> authorizedOperations = validAclOperations(describedGroup.authorizedOperations());
final List<MemberDescription> memberDescriptions = new ArrayList<>(describedGroup.members().size());

describedGroup.members().forEach(groupMember -> {
describedGroup.members().forEach(groupMember ->
memberDescriptions.add(new MemberDescription(
groupMember.memberId(),
Optional.ofNullable(groupMember.instanceId()),
groupMember.clientId(),
groupMember.clientHost(),
new MemberAssignment(convertAssignment(groupMember.assignment())),
Optional.of(new MemberAssignment(convertAssignment(groupMember.targetAssignment())))
));
});
))
);

final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ private void failAllPartitionsForTopic(
Map<TopicPartition, Throwable> failed,
Function<TopicPartition, Throwable> exceptionGenerator
) {
partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp -> {
failed.put(tp, exceptionGenerator.apply(tp));
});
partitions.stream().filter(tp -> tp.topic().equals(topic)).forEach(tp ->
failed.put(tp, exceptionGenerator.apply(tp))
);
}

private void handlePartitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public interface ConsumerRebalanceListener {
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
* {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
* without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
* and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} (Collection)} callback
* and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} callback
* will be triggered by the consumer then.
* <p>
* It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}
}

toClear.forEach(p -> this.records.remove(p));
toClear.forEach(records::remove);
return new ConsumerRecords<>(results);
}

Expand All @@ -263,7 +263,7 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
Set<TopicPartition> currentAssigned = this.subscriptions.assignedPartitions();
if (!currentAssigned.contains(tp))
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
List<ConsumerRecord<K, V>> recs = records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
}

Expand All @@ -286,8 +286,7 @@ public synchronized void setOffsetsException(KafkaException exception) {
@Override
public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
ensureNotClosed();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet())
committed.put(entry.getKey(), entry.getValue());
committed.putAll(offsets);
if (callback != null) {
callback.onComplete(offsets, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private void assignRanges(TopicAssignmentState assignmentState,
private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentStates,
Map<String, List<TopicPartition>> assignment) {

assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) -> {
assignmentStates.stream().collect(Collectors.groupingBy(t -> t.consumers)).forEach((consumers, states) ->
states.stream().collect(Collectors.groupingBy(t -> t.partitionRacks.size())).forEach((numPartitions, coPartitionedStates) -> {
if (coPartitionedStates.size() > 1)
assignCoPartitionedWithRackMatching(consumers, numPartitions, coPartitionedStates, assignment);
Expand All @@ -179,8 +179,8 @@ private void assignWithRackMatching(Collection<TopicAssignmentState> assignmentS
if (state.needsRackAwareAssignment)
assignRanges(state, state::racksMatch, assignment);
}
});
});
})
);
}

private void assignCoPartitionedWithRackMatching(LinkedHashMap<String, Optional<String>> consumers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
topicAssignments.add(topicAssignment);
}
struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray());
if (memberData.generation.isPresent())
struct.set(GENERATION_KEY_NAME, memberData.generation.get());
memberData.generation.ifPresent(integer -> struct.set(GENERATION_KEY_NAME, integer));
ByteBuffer buffer = ByteBuffer.allocate(STICKY_ASSIGNOR_USER_DATA_V1.sizeOf(struct));
STICKY_ASSIGNOR_USER_DATA_V1.write(buffer, struct);
buffer.flip();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

import java.util.Objects;
Expand All @@ -51,7 +50,6 @@
*/
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Time time;
private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;
Expand All @@ -62,15 +60,13 @@ public class CoordinatorRequestManager implements RequestManager {
private Node coordinator;

public CoordinatorRequestManager(
final Time time,
final LogContext logContext,
final long retryBackoffMs,
final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId
) {
Objects.requireNonNull(groupId);
this.time = time;
this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {
sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent if it has changed since the last heartbeat
TreeSet<String> subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription());
if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected RequestManagers create() {

if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) {
Optional<String> serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
coordinator = new CoordinatorRequestManager(time,
coordinator = new CoordinatorRequestManager(
logContext,
retryBackoffMs,
retryBackoffMaxMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TxnPartitionEntry get(TopicPartition topicPartition) {
}

TxnPartitionEntry getOrCreate(TopicPartition topicPartition) {
return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry(tp));
return topicPartitions.computeIfAbsent(topicPartition, TxnPartitionEntry::new);
}

boolean contains(TopicPartition topicPartition) {
Expand Down
4 changes: 2 additions & 2 deletions clients/src/main/java/org/apache/kafka/common/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ public Node nodeById(int id) {

/**
* Get the node by node id if the replica for the given partition is online
* @param partition
* @param id
* @param partition The TopicPartition
* @param id The node id
* @return the node
*/
public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
Expand Down
Loading

0 comments on commit 79ea7d6

Please sign in to comment.