diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 60739891a9a43..f84bb3d4a67bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -565,7 +565,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // there are two cases where we need to construct the prevTasks from the ownedPartitions: // 1) COOPERATIVE clients on version 2.4-2.5 do not encode active tasks and rely on ownedPartitions instead // 2) future clientduring version probing: we can't decode the future subscription info's prev tasks - if (!state.ownedPartitions().isEmpty() && (uuid == futureId ||state.prevActiveTasks().isEmpty())) { + if (!state.ownedPartitions().isEmpty() && (uuid == futureId || state.prevActiveTasks().isEmpty())) { final Set previousActiveTasks = new HashSet<>(); for (final Map.Entry partitionEntry : state.ownedPartitions().entrySet()) { final TopicPartition tp = partitionEntry.getKey(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 96059031b7709..9ece80b09d389 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -72,6 +72,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.ACTIVE_TASK_SENTINEL_OFFSET; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; @@ -232,10 +233,10 @@ private static SubscriptionInfo getInfo(final int version, } private static SubscriptionInfo getInfo(final UUID processId, - final Map taskLags, + final Map taskOffsetSums, final String userEndPoint) { return new SubscriptionInfo( - LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, taskLags); + LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, taskOffsetSums); } private static SubscriptionInfo getInfo(final UUID processId, final Set prevTasks, final Set standbyTasks) { @@ -1982,10 +1983,10 @@ private static void assertEquivalentAssignment(final Map> t } } - static Map getTaskOffsetSums(final Set activeTasks, final Set standbyTasks) { - final Map taskLags = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -1)); - taskLags.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0))); - return taskLags; + static Map getTaskOffsetSums(final Set activeTasks, final Set standbyTasks) { + final Map taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> ACTIVE_TASK_SENTINEL_OFFSET)); + taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L))); + return taskOffsetSums; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java index 8ea2b9d41edc6..e8976ee53ea48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacySubscriptionInfoSerde.java @@ -18,9 +18,6 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger;