From 31f79055cebcc88d866dcd8eb331fd8b85557be9 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 10 Sep 2024 16:28:36 +0200 Subject: [PATCH] KAFKA-17306; Soften the validation when replaying tombstones (#16898) This patch fixes a few buts in the replay logic of the consumer group records: * The first issue is that the logic assumed that the group or the member exists when tombstones are replayed. Obviously, this is incorrect after a restart. The group or the member may not me there anymore if the __consumer_offsets partitions only contains tombstones for the group or the member. The patch fixes this by considering tombstones as no-ops if the entity does not exist. * The second issue is that the logic assumed that consumer group records are always in a specific order in the log so the logic was only accepting to create a consumer group when `ConsumerGroupMemberMetadata` record is replayed. This is obviously incorrect too. During the life time of a consumer group, the records may be in different order. The patch fixes this by allowing the creating of a consumer group by any record. * The third issue is that it is possible to replay offset commit records for a specific consumer group before the consumer group is actually created while replying its records. By default the OffsetMetadataManager creates a simple classic group to hold those offset commits. When the consumer offset records are finally replayed, the logic will fail because a classic group already exists. The patch fixes this by converting a simple classic group when records for a consumer group are replayed. All those combinations are hard to test with unit tests. This patch adds an integration tests which reproduces some of those interleaving of records. I used them to reproduce the issues describe above. Reviewers: TengYao Chi , Jeff Kim , Justine Olshan , Chia-Ping Tsai --- .../group/CoordinatorLoaderImpl.scala | 55 ++- .../api/GroupCoordinatorIntegrationTest.scala | 339 ++++++++++++++++-- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- .../group/GroupMetadataManager.java | 122 +++++-- .../group/classic/ClassicGroup.java | 7 + .../coordinator/group/modern/ModernGroup.java | 4 +- .../group/modern/consumer/ConsumerGroup.java | 3 +- .../group/modern/share/ShareGroup.java | 3 +- .../group/GroupMetadataManagerTest.java | 169 ++++++++- 9 files changed, 618 insertions(+), 88 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index d7033931cd03..3067e98fbe29 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -140,12 +140,20 @@ class CoordinatorLoaderImpl[T]( batch.asScala.foreach { record => val controlRecord = ControlRecordType.parse(record.key) if (controlRecord == ControlRecordType.COMMIT) { + if (isTraceEnabled) { + trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to commit transaction " + + s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.") + } coordinator.replayEndTransactionMarker( batch.producerId, batch.producerEpoch, TransactionResult.COMMIT ) } else if (controlRecord == ControlRecordType.ABORT) { + if (isTraceEnabled) { + trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to abort transaction " + + s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.") + } coordinator.replayEndTransactionMarker( batch.producerId, batch.producerEpoch, @@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T]( } else { batch.asScala.foreach { record => numRecords = numRecords + 1 - try { - coordinator.replay( - record.offset(), - batch.producerId, - batch.producerEpoch, - deserializer.deserialize(record.key, record.value) - ) - } catch { - case ex: UnknownRecordTypeException => - warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " + - s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.") + + val coordinatorRecordOpt = { + try { + Some(deserializer.deserialize(record.key, record.value)) + } catch { + case ex: UnknownRecordTypeException => + warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " + + s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.") + None + case ex: RuntimeException => + val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}" + error(s"$msg.") + throw new RuntimeException(msg, ex) + } + } + + coordinatorRecordOpt.foreach { coordinatorRecord => + try { + if (isTraceEnabled) { + trace(s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " + + s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.") + } + coordinator.replay( + record.offset(), + batch.producerId, + batch.producerEpoch, + coordinatorRecord + ) + } catch { + case ex: RuntimeException => + val msg = s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " + + s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch} " + + s"failed due to: ${ex.getMessage}" + error(s"$msg.") + throw new RuntimeException(msg, ex) + } } } } diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index 2b92750883e1..a666a740a147 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -12,53 +12,320 @@ */ package kafka.api -import kafka.integration.KafkaServerTestHarness import kafka.log.UnifiedLog -import kafka.server.KafkaConfig +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type} +import kafka.test.junit.ClusterTestExtensions import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription} +import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata} +import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicPartition} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ -import java.util.Properties import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record.CompressionType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.config.ServerConfigs +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.extension.ExtendWith -class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { - val offsetsTopicCompressionCodec = CompressionType.GZIP - val overridingProps = new Properties() - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, offsetsTopicCompressionCodec.id.toString) +import java.time.Duration +import java.util.Collections +import java.util.concurrent.TimeUnit - override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnectOrNull, enableControlledShutdown = false).map { - KafkaConfig.fromProps(_, overridingProps) +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) { + + @ClusterTest( + types = Array(Type.KRAFT, Type.ZK), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, value = "1"), + new ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "false"), + ) + ) + def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = { + withConsumer(groupId = "group", groupProtocol = GroupProtocol.CLASSIC) { consumer => + consumer.commitSync(Map( + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") + ).asJava) + + val logManager = cluster.brokers().asScala.head._2.logManager + def getGroupMetadataLogOpt: Option[UnifiedLog] = + logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) + + TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)), + "Commit message not appended in time") + + val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala + val incorrectCompressionCodecs = logSegments + .flatMap(_.log.batches.asScala.map(_.compressionType)) + .filter(_ != CompressionType.GZIP) + + assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression codecs should be empty") + } + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) + def testCoordinatorFailoverAfterCompactingPartitionWithConsumerGroupMemberJoiningAndLeaving(): Unit = { + withAdmin { admin => + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Create a consumer group grp1 with one member. The member subscribes to foo and leaves. This creates + // a mix of group records with tombstones to delete the member. + withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) { consumer => + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment.asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + } + } + + // Force a compaction. + rollAndCompactConsumerOffsets() + + // Restart the broker to reload the group coordinator. + cluster.shutdownBroker(0) + cluster.startBroker(0) + + // Verify the state of the groups to ensure that the group coordinator + // was correctly loaded. If replaying any of the records fails, the + // group coordinator won't be available. + withAdmin { admin => + val groups = admin + .describeConsumerGroups(List("grp1").asJava) + .describedGroups() + .asScala + .toMap + + assertDescribedGroup(groups, "grp1", GroupType.CONSUMER, ConsumerGroupState.EMPTY) + } + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) + def testCoordinatorFailoverCompactingPartitionWithManualOffsetCommitsAndConsumerGroupMemberUnsubscribingAndResubscribing(): Unit = { + withAdmin { admin => + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Create a consumer group grp2 with one member. The member subscribes to foo, manually commits offsets, + // unsubscribes and finally re-subscribes to foo. This creates a mix of group records with tombstones + // and ensure that all the offset commit records are before the consumer group records due to the + // rebalance after the commit sync. + withConsumer(groupId = "grp2", groupProtocol = GroupProtocol.CONSUMER, enableAutoCommit = false) { consumer => + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment().asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + consumer.commitSync() + consumer.unsubscribe() + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment().asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + } + } + + // Force a compaction. + rollAndCompactConsumerOffsets() + + // Restart the broker to reload the group coordinator. + cluster.shutdownBroker(0) + cluster.startBroker(0) + + // Verify the state of the groups to ensure that the group coordinator + // was correctly loaded. If replaying any of the records fails, the + // group coordinator won't be available. + withAdmin { admin => + val groups = admin + .describeConsumerGroups(List("grp2").asJava) + .describedGroups() + .asScala + .toMap + + assertDescribedGroup(groups, "grp2", GroupType.CONSUMER, ConsumerGroupState.EMPTY) + } + } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) + def testCoordinatorFailoverAfterCompactingPartitionWithConsumerGroupDeleted(): Unit = { + withAdmin { admin => + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Create a consumer group grp3 with one member. The member subscribes to foo and leaves the group. Then + // the group is deleted. This creates tombstones to delete the member, the group and the offsets. + withConsumer(groupId = "grp3", groupProtocol = GroupProtocol.CONSUMER) { consumer => + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment().asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + } + + admin + .deleteConsumerGroups(List("grp3").asJava) + .deletedGroups() + .get("grp3") + .get(10, TimeUnit.SECONDS) + } + + // Force a compaction. + rollAndCompactConsumerOffsets() + + // Restart the broker to reload the group coordinator. + cluster.shutdownBroker(0) + cluster.startBroker(0) + + // Verify the state of the groups to ensure that the group coordinator + // was correctly loaded. If replaying any of the records fails, the + // group coordinator won't be available. + withAdmin { admin => + val groups = admin + .describeConsumerGroups(List("grp3").asJava) + .describedGroups() + .asScala + .toMap + + assertDescribedGroup(groups, "grp3", GroupType.CLASSIC, ConsumerGroupState.DEAD) + } } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum: String): Unit = { - val consumer = TestUtils.createConsumer(bootstrapServers()) - val offsetMap = Map( - new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") - ).asJava - consumer.commitSync(offsetMap) - val logManager = brokers.head.logManager - def getGroupMetadataLogOpt: Option[UnifiedLog] = - logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) - - TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)), - "Commit message not appended in time") - - val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala - val incorrectCompressionCodecs = logSegments - .flatMap(_.log.batches.asScala.map(_.compressionType)) - .filter(_ != offsetsTopicCompressionCodec) - assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression codecs should be empty") - - consumer.close() + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) + def testCoordinatorFailoverAfterCompactingPartitionWithUpgradedConsumerGroup(): Unit = { + withAdmin { admin => + TestUtils.createTopicWithAdminRaw( + admin = admin, + topic = "foo", + numPartitions = 3 + ) + + // Create a classic group grp4 with one member. Upgrades the group to the consumer + // protocol. + withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC) { consumer => + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment().asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + } + + withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER) { consumer => + consumer.subscribe(List("foo").asJava) + TestUtils.waitUntilTrue(() => { + consumer.poll(Duration.ofMillis(50)) + consumer.assignment().asScala.nonEmpty + }, msg = "Consumer did not get an non empty assignment") + } + } + + // Force a compaction. + rollAndCompactConsumerOffsets() + + // Restart the broker to reload the group coordinator. + cluster.shutdownBroker(0) + cluster.startBroker(0) + + // Verify the state of the groups to ensure that the group coordinator + // was correctly loaded. If replaying any of the records fails, the + // group coordinator won't be available. + withAdmin { admin => + val groups = admin + .describeConsumerGroups(List("grp4").asJava) + .describedGroups() + .asScala + .toMap + + assertDescribedGroup(groups, "grp4", GroupType.CONSUMER, ConsumerGroupState.EMPTY) + } + } + + private def rollAndCompactConsumerOffsets(): Unit = { + val tp = new TopicPartition("__consumer_offsets", 0) + val broker = cluster.brokers.asScala.head._2 + val log = broker.logManager.getLog(tp).get + log.roll() + assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0)) + } + + private def withAdmin(f: Admin => Unit): Unit = { + val admin: Admin = cluster.createAdminClient() + try { + f(admin) + } finally { + admin.close() + } + } + + private def withConsumer( + groupId: String, + groupProtocol: GroupProtocol, + enableAutoCommit: Boolean = true + )(f: Consumer[Array[Byte], Array[Byte]] => Unit): Unit = { + val consumer = TestUtils.createConsumer( + brokerList = cluster.bootstrapServers(), + groupId = groupId, + groupProtocol = groupProtocol, + enableAutoCommit = enableAutoCommit + ) + try { + f(consumer) + } finally { + consumer.close() + } + } + + private def assertDescribedGroup( + groups: Map[String, KafkaFuture[ConsumerGroupDescription]], + groupId: String, + groupType: GroupType, + state: ConsumerGroupState + ): Unit = { + val group = groups(groupId).get(10, TimeUnit.SECONDS) + + assertEquals(groupId, group.groupId) + assertEquals(groupType, group.`type`) + assertEquals(state, group.state) + assertEquals(Collections.emptyList, group.members) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c3a198643211..f503caf43c4f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -748,9 +748,11 @@ object TestUtils extends Logging { trustStoreFile: Option[File] = None, saslProperties: Option[Properties] = None, keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, - valueDeserializer: Deserializer[V] = new ByteArrayDeserializer): Consumer[K, V] = { + valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, + groupProtocol: GroupProtocol = GroupProtocol.CLASSIC): Consumer[K, V] = { val consumerProps = new Properties consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.toString) consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset) consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.toString) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 4816f780ab6a..58d0070af3e6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -793,8 +793,7 @@ ConsumerGroup getOrMaybeCreateConsumerGroup( } else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) { return convertToConsumerGroup((ClassicGroup) group, records); } else { - throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", - groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId)); } } } @@ -817,10 +816,7 @@ public ConsumerGroup consumerGroup( if (group.type() == CONSUMER) { return (ConsumerGroup) group; } else { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", - groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group", groupId)); } } @@ -842,18 +838,19 @@ ConsumerGroup consumerGroup( * created if it does not exist. * * @return A ConsumerGroup. - * @throws IllegalStateException if the group does not exist and createIfNotExists is false or - * if the group is not a consumer group. + * @throws GroupIdNotFoundException if the group does not exist and createIfNotExists is false or + * if the group is not a consumer group. + * @throws IllegalStateException if the group does not have the expected type. * Package private for testing. */ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( String groupId, boolean createIfNotExists - ) throws GroupIdNotFoundException { + ) throws GroupIdNotFoundException, IllegalStateException { Group group = groups.get(groupId); if (group == null && !createIfNotExists) { - throw new IllegalStateException(String.format("Consumer group %s not found.", groupId)); + throw new GroupIdNotFoundException(String.format("Consumer group %s not found", groupId)); } if (group == null) { @@ -861,14 +858,20 @@ ConsumerGroup getOrMaybeCreatePersistedConsumerGroup( groups.put(groupId, consumerGroup); metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); return consumerGroup; + } else if (group.type() == CONSUMER) { + return (ConsumerGroup) group; + } else if (group.type() == CLASSIC && ((ClassicGroup) group).isSimpleGroup()) { + // If the group is a simple classic group, it was automatically created to hold committed + // offsets if no group existed. Simple classic groups are not backed by any records + // in the __consumer_offsets topic hence we can safely replace it here. Without this, + // replaying consumer group records after offset commit records would not work. + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); + groups.put(groupId, consumerGroup); + metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); + metrics.onClassicGroupStateTransition(EMPTY, null); + return consumerGroup; } else { - if (group.type() == CONSUMER) { - return (ConsumerGroup) group; - } else { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new IllegalStateException(String.format("Group %s is not a consumer group.", groupId)); - } + throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId)); } } @@ -904,10 +907,7 @@ ClassicGroup getOrMaybeCreateClassicGroup( if (group.type() == CLASSIC) { return (ClassicGroup) group; } else { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", - groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", groupId)); } } } @@ -930,10 +930,7 @@ public ClassicGroup classicGroup( if (group.type() == CLASSIC) { return (ClassicGroup) group; } else { - // We don't support upgrading/downgrading between protocols at the moment so - // we throw an exception if a group exists with the wrong type. - throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", - groupId)); + throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", groupId)); } } @@ -3216,7 +3213,14 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); - ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); + ConsumerGroup consumerGroup; + try { + consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist and a tombstone is replayed, we can ignore it. + return; + } + Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet()); if (value != null) { @@ -3225,7 +3229,14 @@ public void replay( .updateWith(value) .build()); } else { - ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); + ConsumerGroupMember oldMember; + try { + oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); + } catch (UnknownMemberIdException ex) { + // If the member does not exist, we can ignore it. + return; + } + if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) { throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone."); @@ -3331,7 +3342,14 @@ public void replay( ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, true); consumerGroup.setGroupEpoch(value.epoch()); } else { - ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + ConsumerGroup consumerGroup; + try { + consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist, we can ignore the tombstone. + return; + } + if (!consumerGroup.members().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + consumerGroup.members().size() + " members."); @@ -3363,7 +3381,14 @@ public void replay( ConsumerGroupPartitionMetadataValue value ) { String groupId = key.groupId(); - ModernGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + + ConsumerGroup group; + try { + group = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist, we can ignore the tombstone. + return; + } if (value != null) { Map subscriptionMetadata = new HashMap<>(); @@ -3389,11 +3414,18 @@ public void replay( ) { String groupId = key.groupId(); String memberId = key.memberId(); - ModernGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); if (value != null) { + ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); group.updateTargetAssignment(memberId, Assignment.fromRecord(value)); } else { + ConsumerGroup group; + try { + group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist, we can ignore the tombstone. + return; + } group.removeTargetAssignment(memberId); } } @@ -3411,11 +3443,18 @@ public void replay( ConsumerGroupTargetAssignmentMetadataValue value ) { String groupId = key.groupId(); - ModernGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); if (value != null) { + ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); group.setTargetAssignmentEpoch(value.assignmentEpoch()); } else { + ConsumerGroup group; + try { + group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist, we can ignore the tombstone. + return; + } if (!group.targetAssignment().isEmpty()) { throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + group.targetAssignment().size() + " members."); @@ -3438,15 +3477,30 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); - ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); - ConsumerGroupMember oldMember = group.getOrMaybeCreateMember(memberId, false); - if (value != null) { + ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); + ConsumerGroupMember oldMember = group.getOrMaybeCreateMember(memberId, true); ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember) .updateWith(value) .build(); group.updateMember(newMember); } else { + ConsumerGroup group; + try { + group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist, we can ignore the tombstone. + return; + } + + ConsumerGroupMember oldMember; + try { + oldMember = group.getOrMaybeCreateMember(memberId, false); + } catch (UnknownMemberIdException ex) { + // If the member does not exist, we can ignore the tombstone. + return; + } + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember) .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 9d1181a2fe2a..907380289e6c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -292,6 +292,13 @@ public Optional protocolType() { return this.protocolType; } + /** + * @return True if the group is a simple group. + */ + public boolean isSimpleGroup() { + return !protocolType.isPresent() && isEmpty() && pendingJoinMembers.isEmpty(); + } + /** * @return the current group state. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 3bcbeedc5525..508b1c7d1314 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.modern; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.Utils; @@ -578,8 +579,9 @@ public static SubscriptionType subscriptionType( * created if it does not exist. * * @return A ConsumerGroupMember. + * @throws UnknownMemberIdException when the member does not exist and createIfNotExists is false. */ - public abstract T getOrMaybeCreateMember(String memberId, boolean createIfNotExists); + public abstract T getOrMaybeCreateMember(String memberId, boolean createIfNotExists) throws UnknownMemberIdException; /** * Adds or updates the member. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 556e67fe9852..817cf7bbe24c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -221,11 +221,12 @@ public String staticMemberId(String groupInstanceId) { * created if it does not exist. * * @return A ConsumerGroupMember. + * @throws UnknownMemberIdException when the member does not exist and createIfNotExists is false. */ public ConsumerGroupMember getOrMaybeCreateMember( String memberId, boolean createIfNotExists - ) { + ) throws UnknownMemberIdException { ConsumerGroupMember member = members.get(memberId); if (member != null) return member; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index b7c0c4338423..1e641dfbe87a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -132,11 +132,12 @@ public ShareGroupState state(long committedOffset) { * created if it does not exist. * * @return A ShareGroupMember. + * @throws UnknownMemberIdException when the member does not exist and createIfNotExists is false. */ public ShareGroupMember getOrMaybeCreateMember( String memberId, boolean createIfNotExists - ) { + ) throws UnknownMemberIdException { ShareGroupMember member = members.get(memberId); if (member != null) return member; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 22ae88aaf536..4fbe8967ad7d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9569,7 +9569,7 @@ public void testOnConsumerGroupStateTransition() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); // Replaying a tombstone for a group that has already been removed should not decrement metric. - tombstones.forEach(tombstone -> assertThrows(IllegalStateException.class, () -> context.replay(tombstone))); + tombstones.forEach(context::replay); verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } @@ -9586,8 +9586,8 @@ public void testOnConsumerGroupStateTransitionOnLoading() { context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); IntStream.range(0, 3).forEach(__ -> { - assertThrows(IllegalStateException.class, () -> context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id"))); - assertThrows(IllegalStateException.class, () -> context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id"))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("group-id")); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("group-id")); }); verify(context.metrics, times(1)).onConsumerGroupStateTransition(null, ConsumerGroup.ConsumerGroupState.EMPTY); @@ -14285,6 +14285,169 @@ public void testConsumerGroupDynamicConfigs() { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testReplayConsumerGroupMemberMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setClientId("clientid") + .setClientHost("clienthost") + .setServerAssignorName("range") + .setRackId("rackid") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build(); + + // The group and the member are created if they do not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord("foo", member)); + assertEquals(member, context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayConsumerGroupMemberMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // ConsumerGroupMemberMetadata tombstone should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the ConsumerGroupMemberMetadata tombstone + // should a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); + } + + @Test + public void testReplayConsumerGroupMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.consumerGroup("foo").groupEpoch()); + } + + @Test + public void testReplayConsumerGroupMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the ConsumerGroupMetadata tombstone + // should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("foo")); + } + + @Test + public void testReplayConsumerGroupPartitionMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map metadata = Collections.singletonMap( + "bar", + new TopicMetadata(Uuid.randomUuid(), "bar", 10, Collections.emptyMap()) + ); + + // The group is created if it does not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord("foo", metadata)); + assertEquals(metadata, context.groupMetadataManager.consumerGroup("foo").subscriptionMetadata()); + } + + @Test + public void testReplayConsumerGroupPartitionMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the ConsumerGroupPartitionMetadata tombstone + // should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("foo")); + } + + @Test + public void testReplayConsumerGroupTargetAssignmentMember() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + Map> assignment = mkAssignment( + mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2) + ); + + // The group is created if it does not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord("foo", "m1", assignment)); + assertEquals(assignment, context.groupMetadataManager.consumerGroup("foo").targetAssignment("m1").partitions()); + } + + @Test + public void testReplayConsumerGroupTargetAssignmentMemberTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the ConsumerGroupTargetAssignmentMember tombstone + // should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("foo")); + } + + @Test + public void testReplayConsumerGroupTargetAssignmentMetadata() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group is created if it does not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord("foo", 10)); + assertEquals(10, context.groupMetadataManager.consumerGroup("foo").assignmentEpoch()); + } + + @Test + public void testReplayConsumerGroupTargetAssignmentMetadataTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group may not exist at all. Replaying the ConsumerGroupTargetAssignmentMetadata tombstone + // should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("foo")); + } + + @Test + public void testReplayConsumerGroupCurrentMemberAssignment() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))) + .build(); + + // The group and the member are created if they do not exist. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord("bar", member)); + assertEquals(member, context.groupMetadataManager.consumerGroup("bar").getOrMaybeCreateMember("member", false)); + } + + @Test + public void testReplayConsumerGroupCurrentMemberAssignmentTombstone() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + // The group still exists but the member is already gone. Replaying the + // ConsumerGroupCurrentMemberAssignment tombstone should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo", 10)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo", "m1")); + assertThrows(UnknownMemberIdException.class, () -> context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("m1", false)); + + // The group may not exist at all. Replaying the ConsumerGroupCurrentMemberAssignment tombstone + // should be a no-op. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("bar", "m1")); + assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("bar")); + } + @Test public void testConsumerGroupHeartbeatOnShareGroup() { String groupId = "group-foo";