Skip to content

Commit

Permalink
KAFKA-17306; Soften the validation when replaying tombstones (#16898)
Browse files Browse the repository at this point in the history
This patch fixes a few buts in the replay logic of the consumer group records:
* The first issue is that the logic assumed that the group or the member exists when tombstones are replayed. Obviously, this is incorrect after a restart. The group or the member may not me there anymore if the __consumer_offsets partitions only contains tombstones for the group or the member. The patch fixes this by considering tombstones as no-ops if the entity does not exist.
* The second issue is that the logic assumed that consumer group records are always in a specific order in the log so the logic was only accepting to create a consumer group when `ConsumerGroupMemberMetadata` record is replayed. This is obviously incorrect too. During the life time of a consumer group, the records may be in different order. The patch fixes this by allowing the creating of a consumer group by any record.
* The third issue is that it is possible to replay offset commit records for a specific consumer group before the consumer group is actually created while replying its records. By default the OffsetMetadataManager creates a simple classic group to hold those offset commits. When the consumer offset records are finally replayed, the logic will fail because a classic group already exists. The patch fixes this by converting a simple classic group when records for a consumer group are replayed.

All those combinations are hard to test with unit tests. This patch adds an integration tests which reproduces some of those interleaving of records. I used them to reproduce the issues describe above.

Reviewers: TengYao Chi <[email protected]>, Jeff Kim <[email protected]>, Justine Olshan <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
dajac authored Sep 10, 2024
1 parent cd7670d commit 31f7905
Show file tree
Hide file tree
Showing 9 changed files with 618 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,20 @@ class CoordinatorLoaderImpl[T](
batch.asScala.foreach { record =>
val controlRecord = ControlRecordType.parse(record.key)
if (controlRecord == ControlRecordType.COMMIT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to commit transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.COMMIT
)
} else if (controlRecord == ControlRecordType.ABORT) {
if (isTraceEnabled) {
trace(s"Replaying end transaction marker from $tp at offset ${record.offset} to abort transaction " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
Expand All @@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T](
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
try {
coordinator.replay(
record.offset(),
batch.producerId,
batch.producerEpoch,
deserializer.deserialize(record.key, record.value)
)
} catch {
case ex: UnknownRecordTypeException =>
warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " +
s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.")

val coordinatorRecordOpt = {
try {
Some(deserializer.deserialize(record.key, record.value))
} catch {
case ex: UnknownRecordTypeException =>
warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " +
s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.")
None
case ex: RuntimeException =>
val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}

coordinatorRecordOpt.foreach { coordinatorRecord =>
try {
if (isTraceEnabled) {
trace(s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch}.")
}
coordinator.replay(
record.offset(),
batch.producerId,
batch.producerEpoch,
coordinatorRecord
)
} catch {
case ex: RuntimeException =>
val msg = s"Replaying record $coordinatorRecord from $tp at offset ${record.offset()} " +
s"with producer id ${batch.producerId} and producer epoch ${batch.producerEpoch} " +
s"failed due to: ${ex.getMessage}"
error(s"$msg.")
throw new RuntimeException(msg, ex)
}
}
}
}
Expand Down
Loading

0 comments on commit 31f7905

Please sign in to comment.