Skip to content

Commit

Permalink
KAFKA-16371; fix lingering pending commit when handling OFFSET_METADA…
Browse files Browse the repository at this point in the history
…TA_TOO_LARGE (apache#16072)

This patch was initially created in apache#15536.

When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to UNSTABLE_OFFSET_COMMIT errors when trying to fetch the offsets with read_committed. This change makes it so the invalid commits are not added to the pendingOffsetCommits set.

Co-authored-by: Kyle Phelps <[email protected]>

Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored and wernerdv committed Jun 3, 2024
1 parent b92dd52 commit 9e12e46
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,9 @@ class GroupMetadataManager(brokerId: Int,

if (isTxnOffsetCommit) {
addProducerGroup(producerId, group.groupId)
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
group.prepareTxnOffsetCommit(producerId, filteredOffsetMetadata)
} else {
group.prepareOffsetCommit(offsetMetadata)
group.prepareOffsetCommit(filteredOffsetMetadata)
}

appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}

@Test
def testOffsetMetadataTooLargePartialFailure(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo")
val offset = 37
val requireStable = true;

groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)

val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())
)

expectAppendMessage(Errors.NONE)

var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}

assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)

assertEquals(Some(Map(
topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
validTopicIdPartition -> Errors.NONE)
), commitErrors)

val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
requireStable,
Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition))
)

assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
assertEquals(
Some(Errors.NONE),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
)
assertEquals(
Some(offset),
cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
)

assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}

@Test
def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = {
val memberId = ""
val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
val producerId = 232L
val producerEpoch = 0.toShort

groupMetadataManager.addOwnedPartition(groupPartitionId)

val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)

val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds())
)

val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}

val verificationGuard = new VerificationGuard()

groupMetadataManager.storeOffsets(
group,
memberId,
offsetTopicPartition,
offsets,
callback,
producerId,
producerEpoch,
verificationGuard = Some(verificationGuard)
)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)

verify(replicaManager).appendRecords(anyLong(),
anyShort(),
any(),
any(),
any[Map[TopicPartition, MemoryRecords]],
capturedResponseCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))

assertEquals(Some(Map(
foo0 -> Errors.NONE,
foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
)), commitErrors)

assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)

group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition))
}

@Test
def testExpireOffset(): Unit = {
val memberId = ""
Expand Down

0 comments on commit 9e12e46

Please sign in to comment.