From e71f6ebc81d24d5f6aad4ebc96d42e56c5eae3c6 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Fri, 29 Sep 2023 10:16:29 -0700 Subject: [PATCH] MINOR: only log error when rack aware assignment is enabled (#14415) Reviewers: Lucas Brutschy , Matthias J. Sax --- .../internals/assignment/RackAwareTaskAssignor.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index 0c8f9492a3e6f..18e3d78d4d78e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -212,8 +212,11 @@ private boolean validateClientRack(final Map> KeyValue previousRackInfo = null; for (final Map.Entry> rackEntry : entry.getValue().entrySet()) { if (!rackEntry.getValue().isPresent()) { - log.error(String.format("RackId doesn't exist for process %s and consumer %s", - processId, rackEntry.getKey())); + if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { + log.error( + String.format("RackId doesn't exist for process %s and consumer %s", + processId, rackEntry.getKey())); + } return false; } if (previousRackInfo == null) { @@ -232,7 +235,9 @@ private boolean validateClientRack(final Map> } } if (previousRackInfo == null) { - log.error(String.format("RackId doesn't exist for process %s", processId)); + if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { + log.error(String.format("RackId doesn't exist for process %s", processId)); + } return false; } racksForProcess.put(entry.getKey(), previousRackInfo.value);