diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala index 696e6534bf047..afc934ac0bb68 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala @@ -96,7 +96,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator topic = "foo", partition = 0, offset = 100L, - expectedError = Errors.NONE, + expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE, version = version.toShort ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java index f0d930648e72f..acadf6d59259d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java @@ -100,12 +100,15 @@ public static GroupType parse(String name) { * @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch * for consumer groups. * @param isTransactional Whether the offset commit is transactional or not. + * @param apiVersion The api version. */ void validateOffsetCommit( String memberId, String groupInstanceId, int generationIdOrMemberEpoch, - boolean isTransactional + boolean isTransactional, + short apiVersion + ) throws KafkaException; /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 01c861c038655..9e2bc6c62a7f0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -325,24 +325,13 @@ private Group validateOffsetCommit( } } - try { - group.validateOffsetCommit( - request.memberId(), - request.groupInstanceId(), - request.generationIdOrMemberEpoch(), - false - ); - } catch (StaleMemberEpochException ex) { - // The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When - // it is, the member should be using the OffsetCommit API version >= 9. As we don't - // support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION - // error if an older version is used. We will revise this when the upgrade path is implemented. - if (context.header.apiVersion() >= 9) { - throw ex; - } else { - throw Errors.UNSUPPORTED_VERSION.exception(); - } - } + group.validateOffsetCommit( + request.memberId(), + request.groupInstanceId(), + request.generationIdOrMemberEpoch(), + false, + context.apiVersion() + ); return group; } @@ -350,9 +339,11 @@ private Group validateOffsetCommit( /** * Validates an TxnOffsetCommit request. * + * @param context The request context. * @param request The actual request. */ private Group validateTransactionalOffsetCommit( + RequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { Group group; @@ -375,7 +366,8 @@ private Group validateTransactionalOffsetCommit( request.memberId(), request.groupInstanceId(), request.generationId(), - true + true, + context.apiVersion() ); } catch (StaleMemberEpochException ex) { throw Errors.ILLEGAL_GENERATION.exception(); @@ -530,7 +522,7 @@ public CoordinatorResult commitT RequestContext context, TxnOffsetCommitRequestData request ) throws ApiException { - validateTransactionalOffsetCommit(request); + validateTransactionalOffsetCommit(context, request); final TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData(); final List records = new ArrayList<>(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index c9f5bc75ca98f..dd0d5c15fd62f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -824,13 +824,15 @@ public void validateMember( * @param groupInstanceId The group instance id. * @param generationId The generation id. * @param isTransactional Whether the offset commit is transactional or not. + * @param apiVersion The api version. */ @Override public void validateOffsetCommit( String memberId, String groupInstanceId, int generationId, - boolean isTransactional + boolean isTransactional, + short apiVersion ) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException { if (isInState(DEAD)) { throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index bbc544289b26b..98f37a7ed6cae 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -21,8 +21,10 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.protocol.Errors; @@ -792,21 +794,36 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberEpoch The member epoch. * @param isTransactional Whether the offset commit is transactional or not. It has no * impact when a consumer group is used. + * @param apiVersion The api version. + * @throws UnknownMemberIdException If the member is not found. + * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided + * member epoch doesn't match the actual member epoch. + * @throws IllegalGenerationException If the member uses the classic protocol and the provided + * generation id is not equal to the member epoch. */ @Override public void validateOffsetCommit( String memberId, String groupInstanceId, int memberEpoch, - boolean isTransactional - ) throws UnknownMemberIdException, StaleMemberEpochException { + boolean isTransactional, + short apiVersion + ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member epoch is -1, the request comes from either the admin client // or a consumer which does not use the group management facility. In this case, // the request can commit offsets if the group is empty. if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); - validateMemberEpoch(memberEpoch, member.memberEpoch()); + + // If the commit is not transactional and the member uses the new consumer protocol (KIP-848), + // the member should be using the OffsetCommit API version >= 9. + if (!isTransactional && !member.useClassicProtocol() && apiVersion < 9) { + throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " + + "by members using the consumer group protocol"); + } + + validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } /** @@ -815,13 +832,18 @@ public void validateOffsetCommit( * @param memberId The member id for consumer groups. * @param memberEpoch The member epoch for consumer groups. * @param lastCommittedOffset The last committed offsets in the timeline. + * @throws UnknownMemberIdException If the member is not found. + * @throws StaleMemberEpochException If the member uses the consumer protocol and the provided + * member epoch doesn't match the actual member epoch. + * @throws IllegalGenerationException If the member uses the classic protocol and the provided + * generation id is not equal to the member epoch. */ @Override public void validateOffsetFetch( String memberId, int memberEpoch, long lastCommittedOffset - ) throws UnknownMemberIdException, StaleMemberEpochException { + ) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException { // When the member id is null and the member epoch is -1, the request either comes // from the admin client or from a client which does not provide them. In this case, // the fetch request is accepted. @@ -832,7 +854,7 @@ public void validateOffsetFetch( throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, groupId)); } - validateMemberEpoch(memberEpoch, member.memberEpoch()); + validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol()); } /** @@ -896,16 +918,27 @@ public boolean isInStates(Set statesFilter, long committedOffset) { } /** - * Throws a StaleMemberEpochException if the received member epoch does not match - * the expected member epoch. + * Throws an exception if the received member epoch does not match the expected member epoch. + * + * @param receivedMemberEpoch The received member epoch or generation id. + * @param expectedMemberEpoch The expected member epoch. + * @param useClassicProtocol The boolean indicating whether the checked member uses the classic protocol. + * @throws StaleMemberEpochException if the member with unmatched member epoch uses the consumer protocol. + * @throws IllegalGenerationException if the member with unmatched generation id uses the classic protocol. */ private void validateMemberEpoch( int receivedMemberEpoch, - int expectedMemberEpoch - ) throws StaleMemberEpochException { + int expectedMemberEpoch, + boolean useClassicProtocol + ) throws StaleMemberEpochException, IllegalGenerationException { if (receivedMemberEpoch != expectedMemberEpoch) { - throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " - + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + if (useClassicProtocol) { + throw new IllegalGenerationException(String.format("The received generation id %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } else { + throw new StaleMemberEpochException(String.format("The received member epoch %d does not match " + + "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch)); + } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index e2684a7cab0ae..6b328d66806d7 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; @@ -51,6 +50,7 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.coordinator.group.classic.ClassicGroup; @@ -1142,14 +1142,8 @@ public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() { assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request)); } - @ParameterizedTest - @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) - public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) { - // All the newer versions are fine. - if (version >= 9) return; - // Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields. - if (version == 0) return; - + @Test + public void testConsumerGroupOffsetCommitWithIllegalGenerationId() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); // Create an empty group. @@ -1162,27 +1156,30 @@ public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short ve group.updateMember(new ConsumerGroupMember.Builder("member") .setMemberEpoch(10) .setPreviousMemberEpoch(10) + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) .build() ); - // Verify that the request is rejected with the correct exception. - assertThrows(UnsupportedVersionException.class, () -> context.commitOffset( - version, - new OffsetCommitRequestData() - .setGroupId("foo") - .setMemberId("member") - .setGenerationIdOrMemberEpoch(9) - .setTopics(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestTopic() - .setName("bar") - .setPartitions(Collections.singletonList( - new OffsetCommitRequestData.OffsetCommitRequestPartition() - .setPartitionIndex(0) - .setCommittedOffset(100L) - )) - )) - ) - ); + OffsetCommitRequestData request = new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(9) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )); + + // Verify that a smaller epoch is rejected. + assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request)); + + // Verify that a larger epoch is rejected. + request.setGenerationIdOrMemberEpoch(11); + assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request)); } @Test @@ -2294,6 +2291,30 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() { () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); } + @Test + public void testConsumerGroupOffsetFetchWithIllegalGenerationId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); + group.updateMember(new ConsumerGroupMember.Builder("member") + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) + .build() + ); + + List topics = Collections.singletonList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setPartitionIndexes(Collections.singletonList(0)) + ); + + // Fetch offsets case. + assertThrows(IllegalGenerationException.class, + () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE)); + + // Fetch all offsets case. + assertThrows(IllegalGenerationException.class, + () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE)); + } + @Test public void testGenericGroupOffsetDelete() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index 14327ae9672c1..76ab8467385b8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -30,10 +30,12 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.group.OffsetAndMetadata; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; @@ -41,6 +43,7 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; import java.util.ArrayList; import java.util.Arrays; @@ -987,10 +990,11 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() { assertTrue(group.isLeader(memberId)); } - @Test - public void testValidateOffsetCommit() { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { // A call from the admin client without any parameters should pass. - group.validateOffsetCommit("", "", -1, false); + group.validateOffsetCommit("", "", -1, false, version); // Add a member. group.add(new ClassicGroupMember( @@ -1012,40 +1016,40 @@ public void testValidateOffsetCommit() { // No parameters and the group is not empty. assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("", "", -1, false)); + () -> group.validateOffsetCommit("", "", -1, false, version)); // A transactional offset commit without any parameters // and a non-empty group is accepted. - group.validateOffsetCommit("", null, -1, true); + group.validateOffsetCommit("", null, -1, true, version); // The member id does not exist. assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("unknown", "unknown", -1, false)); + () -> group.validateOffsetCommit("unknown", "unknown", -1, false, version)); // The instance id does not exist. assertThrows(UnknownMemberIdException.class, - () -> group.validateOffsetCommit("member-id", "unknown", -1, false)); + () -> group.validateOffsetCommit("member-id", "unknown", -1, false, version)); // The generation id is invalid. assertThrows(IllegalGenerationException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 0, false)); + () -> group.validateOffsetCommit("member-id", "instance-id", 0, false, version)); // Group is in prepare rebalance state. assertThrows(RebalanceInProgressException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false)); + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); // Group transitions to stable. group.transitionTo(STABLE); // This should work. - group.validateOffsetCommit("member-id", "instance-id", 1, false); + group.validateOffsetCommit("member-id", "instance-id", 1, false, version); // Replace static member. group.replaceStaticMember("instance-id", "member-id", "new-member-id"); // The old instance id should be fenced. assertThrows(FencedInstanceIdException.class, - () -> group.validateOffsetCommit("member-id", "instance-id", 1, false)); + () -> group.validateOffsetCommit("member-id", "instance-id", 1, false, version)); // Remove member and transitions to dead. group.remove("new-instance-id"); @@ -1053,7 +1057,7 @@ public void testValidateOffsetCommit() { // This should fail with CoordinatorNotAvailableException. assertThrows(CoordinatorNotAvailableException.class, - () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false)); + () -> group.validateOffsetCommit("member-id", "new-instance-id", 1, false, version)); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index a483fa3be022c..a67a5b098b94d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -20,11 +20,15 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.MetadataImageBuilder; import org.apache.kafka.coordinator.group.OffsetAndMetadata; @@ -36,7 +40,6 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -1017,31 +1020,81 @@ public void testMetadataRefreshDeadline() { } @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testValidateOffsetCommit(boolean isTransactional) { + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateTransactionalOffsetCommit(short version) { + boolean isTransactional = true; ConsumerGroup group = createConsumerGroup("group-foo"); // Simulate a call from the admin client without member id and member epoch. // This should pass only if the group is empty. - group.validateOffsetCommit("", "", -1, isTransactional); + group.validateOffsetCommit("", "", -1, isTransactional, version); // The member does not exist. assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("member-id", null, 0, isTransactional)); + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); // Create a member. group.updateMember(new ConsumerGroupMember.Builder("member-id").build()); // A call from the admin client should fail as the group is not empty. assertThrows(UnknownMemberIdException.class, () -> - group.validateOffsetCommit("", "", -1, isTransactional)); + group.validateOffsetCommit("", "", -1, isTransactional, version)); // The member epoch is stale. assertThrows(StaleMemberEpochException.class, () -> - group.validateOffsetCommit("member-id", "", 10, isTransactional)); + group.validateOffsetCommit("member-id", "", 10, isTransactional, version)); // This should succeed. - group.validateOffsetCommit("member-id", "", 0, isTransactional); + group.validateOffsetCommit("member-id", "", 0, isTransactional, version); + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { + boolean isTransactional = false; + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Simulate a call from the admin client without member id and member epoch. + // This should pass only if the group is empty. + group.validateOffsetCommit("", "", -1, isTransactional, version); + + // The member does not exist. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("member-id", null, 0, isTransactional, version)); + + // Create members. + group.updateMember( + new ConsumerGroupMember + .Builder("new-protocol-member-id").build() + ); + group.updateMember( + new ConsumerGroupMember.Builder("old-protocol-member-id") + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()) + .build() + ); + + // A call from the admin client should fail as the group is not empty. + assertThrows(UnknownMemberIdException.class, () -> + group.validateOffsetCommit("", "", -1, isTransactional, version)); + + // The member epoch is stale. + if (version >= 9) { + assertThrows(StaleMemberEpochException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version)); + } + assertThrows(IllegalGenerationException.class, () -> + group.validateOffsetCommit("old-protocol-member-id", "", 10, isTransactional, version)); + + // This should succeed. + if (version >= 9) { + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version); + } else { + assertThrows(UnsupportedVersionException.class, () -> + group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version)); + } } @Test