diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index e2ac2f66ead27..a9a50b77583bd 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -139,7 +139,11 @@ class CoordinatorLoaderImpl[T]( batch.asScala.foreach { record => numRecords = numRecords + 1 try { - coordinator.replay(deserializer.deserialize(record.key, record.value)) + coordinator.replay( + batch.producerId, + batch.producerEpoch, + deserializer.deserialize(record.key, record.value) + ) } catch { case ex: UnknownRecordTypeException => warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " + diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 5ee576ff3c52e..bad39e73c6e55 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -21,7 +21,7 @@ import kafka.server.{ActionQueue, ReplicaManager} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, TimestampType} import org.apache.kafka.common.record.Record.EMPTY_HEADERS import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time @@ -106,13 +106,17 @@ class CoordinatorPartitionWriter[T]( * Write records to the partitions. Records are written in one batch so * atomicity is guaranteed. * - * @param tp The partition to write records to. - * @param records The list of records. The records are written in a single batch. + * @param tp The partition to write records to. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param records The list of records. The records are written in a single batch. * @return The log end offset right after the written records. * @throws KafkaException Any KafkaException caught during the write operation. */ override def append( tp: TopicPartition, + producerId: Long, + producerEpoch: Short, records: util.List[T] ): Long = { if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") @@ -129,7 +133,12 @@ class CoordinatorPartitionWriter[T]( compressionType, TimestampType.CREATE_TIME, 0L, - maxBatchSize + time.milliseconds(), + producerId, + producerEpoch, + 0, + producerId != RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PARTITION_LEADER_EPOCH ) records.forEach { record => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index ef19d732c3436..5bb4eb5b756c9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -21,7 +21,7 @@ import kafka.server.ReplicaManager import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException -import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, CoordinatorPlayback} @@ -104,7 +104,7 @@ class CoordinatorLoaderImplTest { )) { loader => when(replicaManager.getLog(tp)).thenReturn(Some(log)) when(log.logStartOffset).thenReturn(0L) - when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L)) + when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(7L)) val readResult1 = logReadResult(startOffset = 0, records = Seq( new SimpleRecord("k1".getBytes, "v1".getBytes), @@ -131,13 +131,27 @@ class CoordinatorLoaderImplTest { minOneMessage = true )).thenReturn(readResult2) + val readResult3 = logReadResult(startOffset = 5, producerId = 100L, producerEpoch = 5, records = Seq( + new SimpleRecord("k6".getBytes, "v6".getBytes), + new SimpleRecord("k7".getBytes, "v7".getBytes) + )) + + when(log.read( + startOffset = 5L, + maxLength = 1000, + isolation = FetchIsolation.LOG_END, + minOneMessage = true + )).thenReturn(readResult3) + assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)) - verify(coordinator).replay(("k1", "v1")) - verify(coordinator).replay(("k2", "v2")) - verify(coordinator).replay(("k3", "v3")) - verify(coordinator).replay(("k4", "v4")) - verify(coordinator).replay(("k5", "v5")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5")) + verify(coordinator).replay(100L, 5.toShort, ("k6", "v6")) + verify(coordinator).replay(100L, 5.toShort, ("k7", "v7")) } } @@ -220,7 +234,7 @@ class CoordinatorLoaderImplTest { loader.load(tp, coordinator).get(10, TimeUnit.SECONDS) - verify(coordinator).replay(("k2", "v2")) + verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2")) } } @@ -354,14 +368,28 @@ class CoordinatorLoaderImplTest { private def logReadResult( startOffset: Long, + producerId: Long = RecordBatch.NO_PRODUCER_ID, + producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, records: Seq[SimpleRecord] ): FetchDataInfo = { val fileRecords = mock(classOf[FileRecords]) - val memoryRecords = MemoryRecords.withRecords( - startOffset, - CompressionType.NONE, - records: _* - ) + val memoryRecords = if (producerId == RecordBatch.NO_PRODUCER_ID) { + MemoryRecords.withRecords( + startOffset, + CompressionType.NONE, + records: _* + ) + } else { + MemoryRecords.withTransactionalRecords( + startOffset, + CompressionType.NONE, + producerId, + producerEpoch, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + records: _* + ) + } when(fileRecords.sizeInBytes).thenReturn(memoryRecords.sizeInBytes) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index badcb6f8cba58..121a1f119a1ce 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.coordinator.group.runtime.PartitionWriter import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} @@ -133,7 +133,12 @@ class CoordinatorPartitionWriterTest { ("k2", "v2"), ) - assertEquals(11, partitionRecordWriter.append(tp, records.asJava)) + assertEquals(11, partitionRecordWriter.append( + tp, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + records.asJava + )) val batch = recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No records for $tp")) @@ -149,6 +154,86 @@ class CoordinatorPartitionWriterTest { assertEquals(records, receivedRecords) } + @Test + def testTransactionalWriteRecords(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer(_ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse( + Errors.NONE, + 5, + 10, + RecordBatch.NO_TIMESTAMP, + -1, + Collections.emptyList(), + "" + ) + )) + }) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertEquals(11, partitionRecordWriter.append( + tp, + 100L, + 50.toShort, + records.asJava + )) + + val batch = recordsCapture.getValue.getOrElse(tp, + throw new AssertionError(s"No records for $tp")) + assertEquals(1, batch.batches().asScala.toList.size) + + val firstBatch = batch.batches.asScala.head + assertEquals(100L, firstBatch.producerId) + assertEquals(50.toShort, firstBatch.producerEpoch) + assertTrue(firstBatch.isTransactional) + + val receivedRecords = batch.records.asScala.map { record => + ( + Charset.defaultCharset().decode(record.key).toString, + Charset.defaultCharset().decode(record.value).toString, + ) + }.toList + + assertEquals(records, receivedRecords) + } + @Test def testWriteRecordsWithFailure(): Unit = { val tp = new TopicPartition("foo", 0) @@ -195,8 +280,12 @@ class CoordinatorPartitionWriterTest { ("k2", "v2"), ) - assertThrows(classOf[NotLeaderOrFollowerException], - () => partitionRecordWriter.append(tp, records.asJava)) + assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.append( + tp, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + records.asJava) + ) } @Test @@ -224,8 +313,12 @@ class CoordinatorPartitionWriterTest { ("k1", new String(randomBytes)), ) - assertThrows(classOf[RecordTooLargeException], - () => partitionRecordWriter.append(tp, records.asJava)) + assertThrows(classOf[RecordTooLargeException], () => partitionRecordWriter.append( + tp, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + records.asJava) + ) } @Test @@ -244,8 +337,12 @@ class CoordinatorPartitionWriterTest { new Properties() ))) - assertThrows(classOf[IllegalStateException], - () => partitionRecordWriter.append(tp, List.empty.asJava)) + assertThrows(classOf[IllegalStateException], () => partitionRecordWriter.append( + tp, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + List.empty.asJava) + ) } @Test @@ -267,7 +364,11 @@ class CoordinatorPartitionWriterTest { ("k2", "v2"), ) - assertThrows(classOf[NotLeaderOrFollowerException], - () => partitionRecordWriter.append(tp, records.asJava)) + assertThrows(classOf[NotLeaderOrFollowerException], () => partitionRecordWriter.append( + tp, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + records.asJava) + ) } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 53a6fee1f7af4..40b3b3175368b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -590,12 +590,18 @@ private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) { /** * Replays the Record to update the hard state of the group coordinator. - - * @param record The record to apply to the state machine. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param record The record to apply to the state machine. * @throws RuntimeException */ @Override - public void replay(Record record) throws RuntimeException { + public void replay( + long producerId, + short producerEpoch, + Record record + ) throws RuntimeException { ApiMessageAndVersion key = record.key(); ApiMessageAndVersion value = record.value(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java index 5dd7934e533e7..b89ef6160ef08 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java @@ -25,12 +25,17 @@ * @param The type of the record. */ public interface CoordinatorPlayback { - /** * Applies the given record to this object. * - * @param record A record. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param record A record. * @throws RuntimeException if the record can not be applied. */ - void replay(U record) throws RuntimeException; + void replay( + long producerId, + short producerEpoch, + U record + ) throws RuntimeException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 2812a0e3ec5a9..90303b0b31c58 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -617,6 +618,21 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final String name; + /** + * The transactional id. + */ + final String transactionalId; + + /** + * The producer id. + */ + final long producerId; + + /** + * The producer epoch. + */ + final short producerEpoch; + /** * The write operation to execute. */ @@ -650,10 +666,41 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { String name, TopicPartition tp, CoordinatorWriteOperation op + ) { + this( + name, + tp, + null, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + op + ); + } + + /** + * Constructor. + * + * @param name The operation name. + * @param tp The topic partition that the operation is applied to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param op The write operation. + */ + CoordinatorWriteEvent( + String name, + TopicPartition tp, + String transactionalId, + long producerId, + short producerEpoch, + CoordinatorWriteOperation op ) { this.tp = tp; this.name = name; this.op = op; + this.transactionalId = transactionalId; + this.producerId = producerId; + this.producerEpoch = producerEpoch; this.future = new CompletableFuture<>(); this.createdTimeMs = time.milliseconds(); } @@ -697,12 +744,23 @@ public void run() { try { // Apply the records to the state machine. if (result.replayRecords()) { - result.records().forEach(context.coordinator::replay); + result.records().forEach(record -> + context.coordinator.replay( + producerId, + producerEpoch, + record + ) + ); } // Write the records to the log and update the last written // offset. - long offset = partitionWriter.append(tp, result.records()); + long offset = partitionWriter.append( + tp, + producerId, + producerEpoch, + result.records() + ); context.updateLastWrittenOffset(offset); // Add the response to the deferred queue. @@ -1239,6 +1297,43 @@ public CompletableFuture scheduleWriteOperation( return event.future; } + /** + * Schedules a transactional write operation. + * + * @param name The name of the write operation. + * @param tp The address of the coordinator (aka its topic-partitions). + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param op The write operation. + * + * @return A future that will be completed with the result of the write operation + * when the operation is completed or an exception if the write operation failed. + * + * @param The type of the result. + */ + public CompletableFuture scheduleTransactionalWriteOperation( + String name, + TopicPartition tp, + String transactionalId, + long producerId, + short producerEpoch, + CoordinatorWriteOperation op + ) { + throwIfNotRunning(); + log.debug("Scheduled execution of transactional write operation {}.", name); + CoordinatorWriteEvent event = new CoordinatorWriteEvent<>( + name, + tp, + transactionalId, + producerId, + producerEpoch, + op + ); + enqueue(event); + return event.future; + } + /** * Schedules a read operation. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index 1f8d4119f7ea5..e3efbfaf7b254 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -83,13 +83,17 @@ void deregisterListener( * Write records to the partitions. Records are written in one batch so * atomicity is guaranteed. * - * @param tp The partition to write records to. - * @param records The list of records. The records are written in a single batch. + * @param tp The partition to write records to. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param records The list of records. The records are written in a single batch. * @return The log end offset right after the written records. * @throws KafkaException Any KafkaException caught during the write operation. */ long append( TopicPartition tp, + long producerId, + short producerEpoch, List records ) throws KafkaException; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index b9e45bdeb2cbe..074516bfb6a9e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -282,12 +283,12 @@ public void testReplayOffsetCommit() { OffsetCommitKey key = new OffsetCommitKey(); OffsetCommitValue value = new OffsetCommitValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 0), new ApiMessageAndVersion(value, (short) 0) )); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 1), new ApiMessageAndVersion(value, (short) 0) )); @@ -313,12 +314,12 @@ public void testReplayOffsetCommitWithNullValue() { OffsetCommitKey key = new OffsetCommitKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 0), null )); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 1), null )); @@ -345,7 +346,7 @@ public void testReplayConsumerGroupMetadata() { ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); ConsumerGroupMetadataValue value = new ConsumerGroupMetadataValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 3), new ApiMessageAndVersion(value, (short) 0) )); @@ -371,7 +372,7 @@ public void testReplayConsumerGroupMetadataWithNullValue() { ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 3), null )); @@ -398,7 +399,7 @@ public void testReplayConsumerGroupPartitionMetadata() { ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 4), new ApiMessageAndVersion(value, (short) 0) )); @@ -424,7 +425,7 @@ public void testReplayConsumerGroupPartitionMetadataWithNullValue() { ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 4), null )); @@ -451,7 +452,7 @@ public void testReplayConsumerGroupMemberMetadata() { ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); ConsumerGroupMemberMetadataValue value = new ConsumerGroupMemberMetadataValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 5), new ApiMessageAndVersion(value, (short) 0) )); @@ -477,7 +478,7 @@ public void testReplayConsumerGroupMemberMetadataWithNullValue() { ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 5), null )); @@ -504,7 +505,7 @@ public void testReplayConsumerGroupTargetAssignmentMetadata() { ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); ConsumerGroupTargetAssignmentMetadataValue value = new ConsumerGroupTargetAssignmentMetadataValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 6), new ApiMessageAndVersion(value, (short) 0) )); @@ -530,7 +531,7 @@ public void testReplayConsumerGroupTargetAssignmentMetadataWithNullValue() { ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 6), null )); @@ -557,7 +558,7 @@ public void testReplayConsumerGroupTargetAssignmentMember() { ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); ConsumerGroupTargetAssignmentMemberValue value = new ConsumerGroupTargetAssignmentMemberValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 7), new ApiMessageAndVersion(value, (short) 0) )); @@ -583,7 +584,7 @@ public void testReplayConsumerGroupTargetAssignmentMemberKeyWithNullValue() { ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 7), null )); @@ -610,7 +611,7 @@ public void testReplayConsumerGroupCurrentMemberAssignment() { ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); ConsumerGroupCurrentMemberAssignmentValue value = new ConsumerGroupCurrentMemberAssignmentValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 8), new ApiMessageAndVersion(value, (short) 0) )); @@ -636,7 +637,7 @@ public void testReplayConsumerGroupCurrentMemberAssignmentWithNullValue() { ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 8), null )); @@ -660,7 +661,12 @@ public void testReplayKeyCannotBeNull() { metricsShard ); - assertThrows(NullPointerException.class, () -> coordinator.replay(new Record(null, null))); + assertThrows(NullPointerException.class, () -> + coordinator.replay( + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + new Record(null, null)) + ); } @Test @@ -682,10 +688,12 @@ public void testReplayWithUnsupportedVersion() { ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey(); ConsumerGroupCurrentMemberAssignmentValue value = new ConsumerGroupCurrentMemberAssignmentValue(); - assertThrows(IllegalStateException.class, () -> coordinator.replay(new Record( - new ApiMessageAndVersion(key, (short) 255), - new ApiMessageAndVersion(value, (short) 0) - ))); + assertThrows(IllegalStateException.class, () -> + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( + new ApiMessageAndVersion(key, (short) 255), + new ApiMessageAndVersion(value, (short) 0) + )) + ); } @Test @@ -734,7 +742,7 @@ public void testReplayGroupMetadata() { GroupMetadataKey key = new GroupMetadataKey(); GroupMetadataValue value = new GroupMetadataValue(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 2), new ApiMessageAndVersion(value, (short) 4) )); @@ -760,7 +768,7 @@ public void testReplayGroupMetadataWithNullValue() { GroupMetadataKey key = new GroupMetadataKey(); - coordinator.replay(new Record( + coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record( new ApiMessageAndVersion(key, (short) 2), null )); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index e13f27d13a49d..2c589a7febac2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -171,9 +171,19 @@ public void deregisterListener(TopicPartition tp, Listener listener) { } @Override - public long append(TopicPartition tp, List records) throws KafkaException { + public long append( + TopicPartition tp, + long producerId, + short producerEpoch, + List records + ) throws KafkaException { if (records.size() <= maxRecordsInBatch) { - return super.append(tp, records); + return super.append( + tp, + producerId, + producerEpoch, + records + ); } else { throw new KafkaException(String.format("Number of records %d greater than the maximum allowed %d.", records.size(), maxRecordsInBatch)); @@ -197,7 +207,11 @@ private static class MockCoordinatorShard implements CoordinatorShard { } @Override - public void replay(String record) throws RuntimeException { + public void replay( + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { records.add(record); } @@ -844,7 +858,11 @@ public void testScheduleWriteOpWhenReplayFails() { // an exception when replay is called. ctx.coordinator = new MockCoordinatorShard(ctx.snapshotRegistry, ctx.timer) { @Override - public void replay(String record) throws RuntimeException { + public void replay( + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { throw new IllegalArgumentException("error"); } }; @@ -910,6 +928,75 @@ public void testScheduleWriteOpWhenWriteFails() { assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); } + @Test + public void testScheduleTransactionalWriteOp() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder() { + @Override + public MockCoordinatorShard build() { + return coordinator; + } + }; + MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier() { + @Override + public CoordinatorShardBuilder get() { + return shardBuilder; + } + }; + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(shardBuilderSupplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify that the listener was registered. + verify(writer, times(1)).registerListener(eq(TP), any()); + + // Schedule a transactional write. + runtime.scheduleTransactionalWriteOperation( + "tnx-write", + TP, + "transactional-id", + 100L, + (short) 50, + state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response") + ); + + // Verify that the writer got the records with the correct + // producer id and producer epoch. + verify(writer, times(1)).append( + eq(TP), + eq(100L), + eq((short) 50), + eq(Arrays.asList("record1", "record2")) + ); + + // Verify that the coordinator got the records with the correct + // producer id and producer epoch. + verify(coordinator, times(1)).replay( + eq(100L), + eq((short) 50), + eq("record1") + ); + verify(coordinator, times(1)).replay( + eq(100L), + eq((short) 50), + eq("record2") + ); + } + @Test public void testScheduleReadOp() throws ExecutionException, InterruptedException, TimeoutException { MockTimer timer = new MockTimer(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java index f31b4b27a4dcc..759bbb2ced2e4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java @@ -86,6 +86,8 @@ public void deregisterListener( @Override public long append( TopicPartition tp, + long producerId, + short producerEpoch, List records ) throws KafkaException { PartitionState state = partitionState(tp);