Skip to content

Commit

Permalink
MINOR: only log error when rack aware assignment is enabled (apache#1…
Browse files Browse the repository at this point in the history
…4415)

Reviewers:  Lucas Brutschy <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
lihaosky authored Sep 29, 2023
1 parent 03259f6 commit e71f6eb
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,11 @@ private boolean validateClientRack(final Map<UUID, Map<String, Optional<String>>
KeyValue<String, String> previousRackInfo = null;
for (final Map.Entry<String, Optional<String>> rackEntry : entry.getValue().entrySet()) {
if (!rackEntry.getValue().isPresent()) {
log.error(String.format("RackId doesn't exist for process %s and consumer %s",
processId, rackEntry.getKey()));
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
log.error(
String.format("RackId doesn't exist for process %s and consumer %s",
processId, rackEntry.getKey()));
}
return false;
}
if (previousRackInfo == null) {
Expand All @@ -232,7 +235,9 @@ private boolean validateClientRack(final Map<UUID, Map<String, Optional<String>>
}
}
if (previousRackInfo == null) {
log.error(String.format("RackId doesn't exist for process %s", processId));
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
log.error(String.format("RackId doesn't exist for process %s", processId));
}
return false;
}
racksForProcess.put(entry.getKey(), previousRackInfo.value);
Expand Down

0 comments on commit e71f6eb

Please sign in to comment.