From 14b5c4d1e8589ff61faf855e6b64766001e06ecf Mon Sep 17 00:00:00 2001 From: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Thu, 23 May 2024 02:27:00 -0400 Subject: [PATCH] KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (#15988) This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. Reviewers: Jeff Kim , David Jacot --- .../group/GroupCoordinatorService.java | 7 +- .../group/GroupCoordinatorShard.java | 5 +- .../group/GroupMetadataManager.java | 272 ++++++++--- .../group/consumer/ConsumerGroup.java | 23 + .../group/runtime/CoordinatorRuntime.java | 13 + .../group/runtime/CoordinatorTimer.java | 12 + .../group/GroupCoordinatorServiceTest.java | 9 +- .../group/GroupMetadataManagerTest.java | 444 +++++++++++++++++- .../GroupMetadataManagerTestContext.java | 26 +- .../group/MockCoordinatorTimer.java | 13 + .../group/consumer/ConsumerGroupTest.java | 36 ++ .../group/runtime/CoordinatorRuntimeTest.java | 51 ++ 12 files changed, 820 insertions(+), 91 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 6fe7928dc750c..5e4e899faa681 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -423,12 +423,11 @@ public CompletableFuture heartbeat( ); } - // Using a read operation is okay here as we ignore the last committed offset in the snapshot registry. - // This means we will read whatever is in the latest snapshot, which is how the old coordinator behaves. - return runtime.scheduleReadOperation( + return runtime.scheduleWriteOperation( "classic-group-heartbeat", topicPartitionFor(request.groupId()), - (coordinator, __) -> coordinator.classicGroupHeartbeat(context, request) + Duration.ofMillis(config.offsetCommitTimeoutMs), + coordinator -> coordinator.classicGroupHeartbeat(context, request) ).exceptionally(exception -> handleOperationException( "classic-group-heartbeat", request, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index c01605926ba71..081c9764944cd 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -355,9 +355,10 @@ public CoordinatorResult classicGroupSync( * @param context The request context. * @param request The actual Heartbeat request. * - * @return The HeartbeatResponse. + * @return A Result containing the heartbeat response and + * a list of records to update the state machine. */ - public HeartbeatResponseData classicGroupHeartbeat( + public CoordinatorResult classicGroupHeartbeat( RequestContext context, HeartbeatRequestData request ) { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5520676d21b1d..9dae56ef7a627 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1273,6 +1273,7 @@ private void throwIfRebalanceInProgress( // member has already rejoined, so it needs to first finish revoking the partitions and the reconciliation, // and then the next rejoin will be triggered automatically if needed. if (group.groupEpoch() > member.memberEpoch() && !member.state().equals(MemberState.UNREVOKED_PARTITIONS)) { + scheduleConsumerGroupJoinTimeoutIfAbsent(group.groupId(), member.memberId(), member.rebalanceTimeoutMs()); throw Errors.REBALANCE_IN_PROGRESS.exception( String.format("A new rebalance is triggered in group %s and member %s should rejoin to catch up.", group.groupId(), member.memberId()) @@ -1753,6 +1754,7 @@ private CoordinatorResult classicGroupJoinToConsumerGro CompletableFuture appendFuture = new CompletableFuture<>(); appendFuture.whenComplete((__, t) -> { if (t == null) { + cancelConsumerGroupJoinTimeout(groupId, response.memberId()); scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs); // The sync timeout ensures that the member send sync request within the rebalance timeout. scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs()); @@ -2077,6 +2079,39 @@ private void scheduleConsumerGroupSessionTimeout( scheduleConsumerGroupSessionTimeout(groupId, memberId, consumerGroupSessionTimeoutMs); } + /** + * Fences a member from a consumer group. Returns an empty CoordinatorResult + * if the group or the member doesn't exist. + * + * @param groupId The group id. + * @param memberId The member id. + * @param reason The reason for fencing the member. + * + * @return The CoordinatorResult to be applied. + */ + private CoordinatorResult consumerGroupFenceMemberOperation( + String groupId, + String memberId, + String reason + ) { + try { + ConsumerGroup group = consumerGroup(groupId); + ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); + log.info("[GroupId {}] Member {} fenced from the group because {}.", + groupId, memberId, reason); + + return consumerGroupFenceMember(group, member, null); + } catch (GroupIdNotFoundException ex) { + log.debug("[GroupId {}] Could not fence {} because the group does not exist.", + groupId, memberId); + } catch (UnknownMemberIdException ex) { + log.debug("[GroupId {}] Could not fence {} because the member does not exist.", + groupId, memberId); + } + + return new CoordinatorResult<>(Collections.emptyList()); + } + /** * Schedules (or reschedules) the session timeout for the member. * @@ -2089,25 +2124,13 @@ private void scheduleConsumerGroupSessionTimeout( String memberId, int sessionTimeoutMs ) { - String key = consumerGroupSessionTimeoutKey(groupId, memberId); - timer.schedule(key, sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { - try { - ConsumerGroup group = consumerGroup(groupId); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId {}] Member {} fenced from the group because its session expired.", - groupId, memberId); - - return consumerGroupFenceMember(group, member, null); - } catch (GroupIdNotFoundException ex) { - log.debug("[GroupId {}] Could not fence {} because the group does not exist.", - groupId, memberId); - } catch (UnknownMemberIdException ex) { - log.debug("[GroupId {}] Could not fence {} because the member does not exist.", - groupId, memberId); - } - - return new CoordinatorResult<>(Collections.emptyList()); - }); + timer.schedule( + consumerGroupSessionTimeoutKey(groupId, memberId), + sessionTimeoutMs, + TimeUnit.MILLISECONDS, + true, + () -> consumerGroupFenceMemberOperation(groupId, memberId, "the member session expired.") + ); } /** @@ -2180,36 +2203,58 @@ private void cancelConsumerGroupRebalanceTimeout( } /** - * Schedules a sync timeout for the member. + * Schedules a join timeout for the member if there's not a join timeout. * * @param groupId The group id. * @param memberId The member id. * @param rebalanceTimeoutMs The rebalance timeout. */ - private void scheduleConsumerGroupSyncTimeout( + private void scheduleConsumerGroupJoinTimeoutIfAbsent( String groupId, String memberId, int rebalanceTimeoutMs ) { - String key = consumerGroupSyncKey(groupId, memberId); - timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> { - try { - ConsumerGroup group = consumerGroup(groupId); - ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); - log.info("[GroupId {}] Member {} fenced from the group because its session expired.", - groupId, memberId); + timer.scheduleIfAbsent( + consumerGroupJoinKey(groupId, memberId), + rebalanceTimeoutMs, + TimeUnit.MILLISECONDS, + true, + () -> consumerGroupFenceMemberOperation(groupId, memberId, "the classic member failed to join within the rebalance timeout.") + ); + } - return consumerGroupFenceMember(group, member, null); - } catch (GroupIdNotFoundException ex) { - log.debug("[GroupId {}] Could not fence {} because the group does not exist.", - groupId, memberId); - } catch (UnknownMemberIdException ex) { - log.debug("[GroupId {}] Could not fence {} because the member does not exist.", - groupId, memberId); - } + /** + * Cancels the join timeout of the member. + * + * @param groupId The group id. + * @param memberId The member id. + */ + private void cancelConsumerGroupJoinTimeout( + String groupId, + String memberId + ) { + timer.cancel(consumerGroupJoinKey(groupId, memberId)); + } - return new CoordinatorResult<>(Collections.emptyList()); - }); + /** + * Schedules a sync timeout for the member. + * + * @param groupId The group id. + * @param memberId The member id. + * @param rebalanceTimeoutMs The rebalance timeout. + */ + private void scheduleConsumerGroupSyncTimeout( + String groupId, + String memberId, + int rebalanceTimeoutMs + ) { + timer.schedule( + consumerGroupSyncKey(groupId, memberId), + rebalanceTimeoutMs, + TimeUnit.MILLISECONDS, + true, + () -> consumerGroupFenceMemberOperation(groupId, memberId, "the member failed to sync within timeout.") + ); } /** @@ -4072,19 +4117,7 @@ private CoordinatorResult classicGroupSyncToConsumerGro String groupId = request.groupId(); String memberId = request.memberId(); String instanceId = request.groupInstanceId(); - - ConsumerGroupMember member; - if (instanceId == null) { - member = group.getOrMaybeCreateMember(request.memberId(), false); - } else { - member = group.staticMember(instanceId); - if (member == null) { - throw new UnknownMemberIdException( - String.format("Member with instance id %s is not a member of group %s.", instanceId, groupId) - ); - } - throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); - } + ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); throwIfMemberDoesNotUseClassicProtocol(member); throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), request.generationId()); @@ -4209,23 +4242,56 @@ private void removePendingSyncMember( * @param context The request context. * @param request The actual Heartbeat request. * - * @return The Heartbeat response. + * @return The coordinator result that contains the heartbeat response. */ - public HeartbeatResponseData classicGroupHeartbeat( + public CoordinatorResult classicGroupHeartbeat( RequestContext context, HeartbeatRequestData request ) { - ClassicGroup group = getOrMaybeCreateClassicGroup(request.groupId(), false); + Group group = groups.get(request.groupId(), Long.MAX_VALUE); + if (group == null) { + throw new UnknownMemberIdException( + String.format("Group %s not found.", request.groupId()) + ); + } + + if (group.type() == CLASSIC) { + return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, context, request); + } else { + return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, context, request); + } + } + + /** + * Handle a classic group HeartbeatRequest to a classic group. + * + * @param group The ClassicGroup. + * @param context The request context. + * @param request The actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ + private CoordinatorResult classicGroupHeartbeatToClassicGroup( + ClassicGroup group, + RequestContext context, + HeartbeatRequestData request + ) { validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId()); switch (group.currentState()) { case EMPTY: - return new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()); + return new CoordinatorResult<>( + Collections.emptyList(), + new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); case PREPARING_REBALANCE: rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId())); - return new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()); + return new CoordinatorResult<>( + Collections.emptyList(), + new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()) + ); case COMPLETING_REBALANCE: case STABLE: @@ -4233,7 +4299,10 @@ public HeartbeatResponseData classicGroupHeartbeat( // is in CompletingRebalance state. In this case, we should treat them as // normal heartbeat requests and reset the timer rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId())); - return new HeartbeatResponseData(); + return new CoordinatorResult<>( + Collections.emptyList(), + new HeartbeatResponseData() + ); default: throw new IllegalStateException("Reached unexpected state " + @@ -4274,6 +4343,81 @@ private void validateClassicGroupHeartbeat( } } + /** + * Handle a classic group HeartbeatRequest to a consumer group. A response with + * REBALANCE_IN_PROGRESS is returned if 1) the member epoch is smaller than the + * group epoch, 2) the member is in UNREVOKED_PARTITIONS, or 3) the member is in + * UNRELEASED_PARTITIONS and all its partitions pending assignment are free. + * + * @param group The ConsumerGroup. + * @param context The request context. + * @param request The actual Heartbeat request. + * + * @return The coordinator result that contains the heartbeat response. + */ + private CoordinatorResult classicGroupHeartbeatToConsumerGroup( + ConsumerGroup group, + RequestContext context, + HeartbeatRequestData request + ) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException { + String groupId = request.groupId(); + String memberId = request.memberId(); + String instanceId = request.groupInstanceId(); + ConsumerGroupMember member = validateConsumerGroupMember(group, memberId, instanceId); + + throwIfMemberDoesNotUseClassicProtocol(member); + throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId()); + + scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get()); + + Errors error = Errors.NONE; + // The member should rejoin if any of the following conditions is met. + // 1) The group epoch is bumped so the member need to rejoin to catch up. + // 2) The member needs to revoke some partitions and rejoin to reconcile with the new epoch. + // 3) The member's partitions pending assignment are free, so it can rejoin to get the complete assignment. + if (member.memberEpoch() < group.groupEpoch() || + member.state() == MemberState.UNREVOKED_PARTITIONS || + (member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member))) { + error = Errors.REBALANCE_IN_PROGRESS; + scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, member.rebalanceTimeoutMs()); + } + + return new CoordinatorResult<>( + Collections.emptyList(), + new HeartbeatResponseData().setErrorCode(error.code()) + ); + } + + /** + * Validates that (1) the instance id exists and is mapped to the member id + * if the group instance id is provided; and (2) the member id exists in the group. + * + * @param group The consumer group. + * @param memberId The member id. + * @param instanceId The instance id. + * + * @return The ConsumerGroupMember. + */ + private ConsumerGroupMember validateConsumerGroupMember( + ConsumerGroup group, + String memberId, + String instanceId + ) throws UnknownMemberIdException, FencedInstanceIdException { + ConsumerGroupMember member; + if (instanceId == null) { + member = group.getOrMaybeCreateMember(memberId, false); + } else { + member = group.staticMember(instanceId); + if (member == null) { + throw new UnknownMemberIdException( + String.format("Member with instance id %s is not a member of group %s.", instanceId, group.groupId()) + ); + } + throwIfInstanceIdIsFenced(member, group.groupId(), memberId, instanceId); + } + return member; + } + /** * Handle a classic LeaveGroupRequest. * @@ -4583,6 +4727,20 @@ static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } + /** + * Generate a consumer group join key for the timer. + * + * Package private for testing. + * + * @param groupId The group id. + * @param memberId The member id. + * + * @return the sync key. + */ + static String consumerGroupJoinKey(String groupId, String memberId) { + return "join-" + groupId + "-" + memberId; + } + /** * Generate a consumer group sync key for the timer. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 9e334bb413b77..dd3a6f2e7bfd1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -1300,4 +1300,27 @@ public boolean allMembersUseClassicProtocolExcept(String memberId) { return numClassicProtocolMembers() == members().size() - 1 && !getOrMaybeCreateMember(memberId, false).useClassicProtocol(); } + + /** + * Checks whether the member has any unreleased partition. + * + * @param member The member to check. + * @return A boolean indicating whether the member has partitions in the target + * assignment that hasn't been revoked by other members. + */ + public boolean waitingOnUnreleasedPartition(ConsumerGroupMember member) { + if (member.state() == MemberState.UNRELEASED_PARTITIONS) { + for (Map.Entry> entry : targetAssignment().get(member.memberId()).partitions().entrySet()) { + Uuid topicId = entry.getKey(); + Set assignedPartitions = member.assignedPartitions().getOrDefault(topicId, Collections.emptySet()); + + for (int partition : entry.getValue()) { + if (!assignedPartitions.contains(partition) && currentPartitionEpoch(topicId, partition) != -1) { + return true; + } + } + } + } + return false; + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index ed6f649d61fcd..c0c194a901042 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -402,6 +402,19 @@ public void run() { timer.add(task); } + @Override + public void scheduleIfAbsent( + String key, + long delay, + TimeUnit unit, + boolean retry, + TimeoutOperation operation + ) { + if (!tasks.containsKey(key)) { + schedule(key, delay, unit, retry, 500, operation); + } + } + @Override public void cancel(String key) { TimerTask prevTask = tasks.remove(key); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java index 4c5b3aa8fd68c..ef5dfc6dc39ad 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java @@ -66,6 +66,18 @@ interface TimeoutOperation { */ void schedule(String key, long delay, TimeUnit unit, boolean retry, long retryBackoff, TimeoutOperation operation); + /** + * Add an operation to the timer if there's no operation with the same key. + * + * @param key The key to identify this operation. + * @param delay The delay to wait before expiring. + * @param unit The delay unit. + * @param retry A boolean indicating whether the operation should + * be retried on failure. + * @param operation The operation to perform upon expiration. + */ + void scheduleIfAbsent(String key, long delay, TimeUnit unit, boolean retry, TimeoutOperation operation); + /** * Remove an operation corresponding to a given key. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 731fa5ca08a8f..6cd96458c647d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -676,9 +676,10 @@ public void testHeartbeat() throws Exception { service.startup(() -> 1); - when(runtime.scheduleReadOperation( + when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture( new HeartbeatResponseData() @@ -708,9 +709,10 @@ public void testHeartbeatCoordinatorNotAvailableException() throws Exception { service.startup(() -> 1); - when(runtime.scheduleReadOperation( + when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( new CoordinatorLoadInProgressException(null) @@ -740,9 +742,10 @@ public void testHeartbeatCoordinatorException() throws Exception { service.startup(() -> 1); - when(runtime.scheduleReadOperation( + when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("classic-group-heartbeat"), ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.any() )).thenReturn(FutureUtils.failedFuture( new RebalanceInProgressException() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 058cdf206db60..41cd5405abb91 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -116,6 +116,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupJoinKey; +import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; @@ -7452,7 +7453,7 @@ public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws Exceptio .setMemberId(rebalanceResult.leaderId) .setGenerationId(rebalanceResult.generationId); - HeartbeatResponseData validHeartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData validHeartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode()); assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupHeartbeat( @@ -7513,7 +7514,7 @@ public void testHeartbeatEmptyGroup() { .setMemberId("member-id") .setGenerationId(0); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), heartbeatResponse.errorCode()); } @@ -7560,7 +7561,7 @@ public void testHeartbeatDuringPreparingRebalance() throws Exception { .setGroupId("group-id") .setMemberId(memberId) .setGenerationId(0) - ); + ).response(); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } @@ -7586,7 +7587,7 @@ public void testHeartbeatDuringCompletingRebalance() throws Exception { .setGroupId("group-id") .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()) - ); + ).response(); assertEquals(new HeartbeatResponseData(), heartbeatResponse); } @@ -7616,7 +7617,7 @@ public void testValidHeartbeat() throws Exception { .setGroupId("group-id") .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()) - ); + ).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); } @@ -7653,12 +7654,12 @@ public void testClassicGroupMemberHeartbeatMaintainsSession() throws Exception { .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(2500)); - heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); } @@ -7693,7 +7694,7 @@ public void testClassicGroupMemberSessionTimeoutDuringRebalance() throws Excepti .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); // Advance clock by first member's session timeout. @@ -7771,7 +7772,7 @@ public void testRebalanceCompletesBeforeMemberJoins() throws Exception { for (int i = 0; i < 2; i++) { GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(2500)); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(firstMemberHeartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(firstMemberHeartbeatRequest).response(); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } @@ -7813,7 +7814,7 @@ public void testRebalanceCompletesBeforeMemberJoins() throws Exception { firstMemberHeartbeatRequest .setMemberId(otherMemberId) .setGenerationId(2) - ); + ).response(); assertEquals(expectedError.code(), heartbeatResponse.errorCode()); } @@ -7855,7 +7856,7 @@ public void testRebalanceCompletesBeforeMemberJoins() throws Exception { firstMemberHeartbeatRequest .setMemberId(otherMemberId) .setGenerationId(3) - ); + ).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); } @@ -7872,7 +7873,7 @@ public void testSyncGroupEmptyAssignment() throws Exception { .setGroupId("group-id") .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()) - ); + ).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); } @@ -7942,7 +7943,7 @@ public void testSecondMemberPartiallyJoinAndTimeout() throws Exception { for (int i = 0; i < 2; i++) { GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(2500)); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); } @@ -8203,7 +8204,7 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep .setMemberId(leaderJoinResponse.memberId()) .setGenerationId(leaderJoinResponse.generationId()); - HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest); + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(heartbeatRequest).response(); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } @@ -11726,6 +11727,18 @@ public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws E .setTopicPartitions(Collections.emptyList()) ); + // Member 1 heartbeats to be notified to rejoin. + assertEquals( + Errors.REBALANCE_IN_PROGRESS.code(), + context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setGenerationId(joinResponse1.generationId()) + ).response().errorCode() + ); + context.assertJoinTimeout(groupId, memberId1, 500); + // Member 1 rejoins to transition from UNRELEASED_PARTITIONS to STABLE. GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request); ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(expectedMember1) @@ -11743,6 +11756,7 @@ public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws E assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult2.appendFuture.complete(null); + context.assertNoJoinTimeout(groupId, memberId1); JoinGroupResponseData joinResponse2 = joinResult2.joinFuture.get(); assertEquals( new JoinGroupResponseData() @@ -11943,6 +11957,18 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th Collections.singletonList(new TopicPartition(fooTopicName, 0)) ); + // Member 1 heartbeats to be notified to rejoin. + assertEquals( + Errors.REBALANCE_IN_PROGRESS.code(), + context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setGenerationId(joinResponse1.generationId()) + ).response().errorCode() + ); + context.assertJoinTimeout(groupId, memberId1, 500); + // Member 1 rejoins to transition from UNREVOKED_PARTITIONS to UNRELEASED_PARTITIONS. JoinGroupRequestData request2 = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() .withGroupId(groupId) @@ -11983,6 +12009,7 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember2.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult2.appendFuture.complete(null); + context.assertNoJoinTimeout(groupId, memberId1); JoinGroupResponseData joinResponse2 = joinResult2.joinFuture.get(); assertEquals( new JoinGroupResponseData() @@ -12017,6 +12044,18 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th .setTopicPartitions(Collections.emptyList()) ); + // Member 1 heartbeats to be notified to rejoin. + assertEquals( + Errors.REBALANCE_IN_PROGRESS.code(), + context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setGenerationId(joinResponse2.generationId()) + ).response().errorCode() + ); + context.assertJoinTimeout(groupId, memberId1, 500); + // Member 1 rejoins to transition from UNRELEASED_PARTITIONS to STABLE. JoinGroupRequestData request3 = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() .withGroupId(groupId) @@ -12056,6 +12095,7 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th assertEquals(expectedMember3.state(), group.getOrMaybeCreateMember(memberId1, false).state()); joinResult3.appendFuture.complete(null); + context.assertNoJoinTimeout(groupId, memberId1); JoinGroupResponseData joinResponse3 = joinResult3.joinFuture.get(); assertEquals( new JoinGroupResponseData() @@ -12143,7 +12183,6 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) @@ -12181,7 +12220,6 @@ public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() throws Exce // Consumer group with a member that doesn't use the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -12231,7 +12269,6 @@ public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() throws Exc // Consumer group with a static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -12261,7 +12298,7 @@ public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() t .setName("range") .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( new ConsumerPartitionAssignor.Subscription( - Arrays.asList("foo"), + Collections.singletonList("foo"), null, Collections.emptyList() ) @@ -12270,7 +12307,6 @@ public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() t // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -12326,7 +12362,7 @@ public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() throws Ex .setName("range") .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( new ConsumerPartitionAssignor.Subscription( - Arrays.asList("foo"), + Collections.singletonList("foo"), null, Collections.emptyList() ) @@ -12335,7 +12371,6 @@ public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() throws Ex // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -12369,7 +12404,7 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .setName("range") .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( new ConsumerPartitionAssignor.Subscription( - Arrays.asList("foo"), + Collections.singletonList("foo"), null, Collections.emptyList() ) @@ -12379,10 +12414,10 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce // Consumer group with a member using the classic protocol. // The group epoch is greater than the member epoch. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11) .withMember(new ConsumerGroupMember.Builder(memberId) + .setRebalanceTimeoutMs(10000) .setClassicMemberMetadata( new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() .setSessionTimeoutMs(5000) @@ -12401,6 +12436,369 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce .withProtocolName("range") .build()) ); + context.assertJoinTimeout(groupId, memberId, 10000); + } + + @Test + public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Exception { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + int sessionTimeout = 5000; + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a member using the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + // Heartbeat to schedule the session timeout. + HeartbeatRequestData request = new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(10); + context.sendClassicGroupHeartbeat(request); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + + // Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + + HeartbeatResponseData heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); + assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + + // Advance clock by 1/2 of session timeout. + GroupMetadataManagerTestContext.assertNoOrEmptyResult(context.sleep(sessionTimeout / 2)); + + heartbeatResponse = context.sendClassicGroupHeartbeat(request).response(); + assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + } + + @Test + public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws Exception { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String memberId3 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + int sessionTimeout = 5000; + int rebalanceTimeout = 10000; + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Member 1 has a member epoch smaller than the group epoch. + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(9) + .build(); + + // Member 2 has unrevoked partition. + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 0))) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build(); + + // Member 3 is in UNRELEASED_PARTITIONS and all the partitions in its target assignment are free. + ConsumerGroupMember member3 = new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 0))) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build(); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withMember(member3) + .withAssignment(memberId3, mkAssignment(mkTopicAssignment(barTopicId, 0, 1, 2)))) + .build(); + + Arrays.asList(memberId1, memberId2, memberId3).forEach(memberId -> { + CoordinatorResult heartbeatResult = context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(memberId.equals(memberId1) ? 9 : 10) + ); + assertEquals(Collections.emptyList(), heartbeatResult.records()); + assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResult.response().errorCode()); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + context.assertJoinTimeout(groupId, memberId, rebalanceTimeout); + }); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithUnknownMember() { + String groupId = "group-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGenerationId(10) + )); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGroupInstanceId("unknown-instance-id") + .setGenerationId(10) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithFencedInstanceId() { + String groupId = "group-id"; + String memberId = "member-id"; + String instanceId = "instance-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setInstanceId(instanceId) + .setMemberEpoch(10) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(Collections.emptyList()) + ) + .build())) + .build(); + + assertThrows(FencedInstanceIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId("unknown-member-id") + .setGroupInstanceId(instanceId) + .setGenerationId(10) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithIllegalGenerationId() { + String groupId = "group-id"; + String memberId = "member-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(Collections.emptyList()) + ) + .build())) + .build(); + + assertThrows(IllegalGenerationException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(9) + )); + } + + @Test + public void testClassicGroupHeartbeatToConsumerWithMemberNotUsingClassicProtocol() { + String groupId = "group-id"; + String memberId = "member-id"; + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .build())) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupHeartbeat( + new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(10) + )); + } + + @Test + public void testConsumerGroupMemberUsingClassicProtocolFencedWhenSessionTimeout() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + int sessionTimeout = 5000; + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a member using the classic protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(sessionTimeout) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(10) + .build())) + .build(); + + // Heartbeat to schedule the session timeout. + HeartbeatRequestData request = new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(10); + context.sendClassicGroupHeartbeat(request); + context.assertSessionTimeout(groupId, memberId, sessionTimeout); + + // Advance clock by session timeout + 1. + List> timeouts = context.sleep(sessionTimeout + 1); + + // The member is fenced from the group. + assertEquals(1, timeouts.size()); + MockCoordinatorTimer.ExpiredTimeout timeout = timeouts.get(0); + assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId), timeout.key); + assertRecordsEquals( + Arrays.asList( + // The member is removed. + CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // The group epoch is bumped. + CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11) + ), + timeout.result.records() + ); + } + + @Test + public void testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() { + String groupId = "group-id"; + String memberId = Uuid.randomUuid().toString(); + int rebalanceTimeout = 500; + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"), + null, + Collections.emptyList() + ) + ))) + ); + + // Consumer group with a member using the classic protocol whose member epoch is smaller than the group epoch. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setRebalanceTimeoutMs(rebalanceTimeout) + .setClassicMemberMetadata( + new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSessionTimeoutMs(5000) + .setSupportedProtocols(protocols) + ) + .setMemberEpoch(9) + .build())) + .build(); + + // Heartbeat to schedule the join timeout. + HeartbeatRequestData request = new HeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setGenerationId(9); + assertEquals( + Errors.REBALANCE_IN_PROGRESS.code(), + context.sendClassicGroupHeartbeat(request).response().errorCode() + ); + context.assertSessionTimeout(groupId, memberId, 5000); + context.assertJoinTimeout(groupId, memberId, rebalanceTimeout); + + // Advance clock by rebalance timeout + 1. + List> timeouts = context.sleep(rebalanceTimeout + 1); + + // The member is fenced from the group. + assertEquals(1, timeouts.size()); + MockCoordinatorTimer.ExpiredTimeout timeout = timeouts.get(0); + assertEquals(consumerGroupJoinKey(groupId, memberId), timeout.key); + assertRecordsEquals( + Arrays.asList( + // The member is removed. + CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId), + CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId), + CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId), + + // The group epoch is bumped. + CoordinatorRecordHelpers.newGroupEpochRecord(groupId, 11) + ), + timeout.result.records() + ); } private static void checkJoinGroupResponse( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index ce57498b21203..bd7a30c541777 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -89,6 +89,7 @@ import static org.apache.kafka.coordinator.group.Assertions.assertSyncGroupResponseEquals; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey; +import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupJoinKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSyncKey; @@ -619,6 +620,27 @@ public void assertNoRebalanceTimeout( assertNull(timeout); } + public MockCoordinatorTimer.ScheduledTimeout assertJoinTimeout( + String groupId, + String memberId, + long delayMs + ) { + MockCoordinatorTimer.ScheduledTimeout timeout = + timer.timeout(consumerGroupJoinKey(groupId, memberId)); + assertNotNull(timeout); + assertEquals(time.milliseconds() + delayMs, timeout.deadlineMs); + return timeout; + } + + public void assertNoJoinTimeout( + String groupId, + String memberId + ) { + MockCoordinatorTimer.ScheduledTimeout timeout = + timer.timeout(consumerGroupJoinKey(groupId, memberId)); + assertNull(timeout); + } + public MockCoordinatorTimer.ScheduledTimeout assertSyncTimeout( String groupId, String memberId, @@ -1112,7 +1134,7 @@ public void verifySessionExpiration(ClassicGroup group, int timeoutMs) { assertEquals(0, group.size()); } - public HeartbeatResponseData sendClassicGroupHeartbeat( + public CoordinatorResult sendClassicGroupHeartbeat( HeartbeatRequestData request ) { RequestContext context = new RequestContext( @@ -1164,7 +1186,7 @@ public void verifyHeartbeat( if (expectedError == Errors.UNKNOWN_MEMBER_ID) { assertThrows(UnknownMemberIdException.class, () -> sendClassicGroupHeartbeat(request)); } else { - HeartbeatResponseData response = sendClassicGroupHeartbeat(request); + HeartbeatResponseData response = sendClassicGroupHeartbeat(request).response(); assertEquals(expectedError.code(), response.errorCode()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java index 439da4bbf473f..b4942c06dc550 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockCoordinatorTimer.java @@ -130,6 +130,19 @@ public void schedule( schedule(key, delay, unit, retry, 500L, operation); } + @Override + public void scheduleIfAbsent( + String key, + long delay, + TimeUnit unit, + boolean retry, + TimeoutOperation operation + ) { + if (!timeoutMap.containsKey(key)) { + schedule(key, delay, unit, retry, 500L, operation); + } + } + /** * Cancels a timeout. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 870eb9d7ec862..4ae14c25439b7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -368,6 +368,42 @@ public void testDeletingMemberRemovesPartitionEpoch() { assertEquals(-1, consumerGroup.currentPartitionEpoch(fooTopicId, 9)); } + @Test + public void testWaitingOnUnreleasedPartition() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + Uuid zarTopicId = Uuid.randomUuid(); + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + ConsumerGroup consumerGroup = createConsumerGroup("foo"); + consumerGroup.updateTargetAssignment(memberId1, new Assignment(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3), + mkTopicAssignment(zarTopicId, 7, 8, 9) + ))); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setMemberEpoch(10) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(barTopicId, 4, 5, 6))) + .build(); + consumerGroup.updateMember(member1); + + assertFalse(consumerGroup.waitingOnUnreleasedPartition(member1)); + + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setMemberEpoch(10) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(zarTopicId, 7))) + .build(); + consumerGroup.updateMember(member2); + + assertTrue(consumerGroup.waitingOnUnreleasedPartition(member1)); + } + @Test public void testGroupState() { Uuid fooTopicId = Uuid.randomUuid(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index d874ceae95f00..a8cc200b35922 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -2505,6 +2505,57 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(0, ctx.timer.size()); } + @Test + public void testTimerScheduleIfAbsent() throws InterruptedException { + MockTimer timer = new MockTimer(); + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(new MockPartitionWriter()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .build(); + + // Loads the coordinator. + runtime.scheduleLoadOperation(TP, 10); + + // Check initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0, ctx.timer.size()); + + // Timer #1. + AtomicInteger cnt = new AtomicInteger(0); + ctx.timer.scheduleIfAbsent("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { + cnt.incrementAndGet(); + throw new KafkaException("error"); + }); + + // The coordinator timer should have one pending task. + assertEquals(1, ctx.timer.size()); + + // Advance half of the time to fire the pending timer. + timer.advanceClock(10 / 2); + + // Reschedule timer #1. Since the timer already exists, the timeout shouldn't be refreshed. + ctx.timer.scheduleIfAbsent("timer-1", 10, TimeUnit.MILLISECONDS, false, () -> { + cnt.incrementAndGet(); + throw new KafkaException("error"); + }); + + // Advance the time to fire the pending timer. + timer.advanceClock(10 / 2 + 1); + + // The timer should have been called and the timer should have no pending tasks. + assertEquals(1, cnt.get()); + assertEquals(0, ctx.timer.size()); + } + @Test public void testStateChanges() throws Exception { MockTimer timer = new MockTimer();