diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index ee0cf18212a20..c08e20b98cdaa 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -768,6 +768,17 @@ private void freeCurrentBatch() { private void flushCurrentBatch() { if (currentBatch != null) { try { + if (currentBatch.builder.numRecords() == 0) { + // The only way we can get here is if append() has failed in an unexpected + // way and left an empty batch. Try to clean it up. + log.debug("Tried to flush an empty batch for {}.", tp); + // There should not be any deferred events attached to the batch. We fail + // the batch just in case. As a side effect, coordinator state is also + // reverted, but there should be no changes since the batch was empty. + failCurrentBatch(new IllegalStateException("Record batch was empty")); + return; + } + long flushStartMs = time.milliseconds(); // Write the records to the log and update the last written offset. long offset = partitionWriter.append( @@ -926,7 +937,7 @@ private void append( // If the records are empty, it was a read operation after all. In this case, // the response can be returned directly iff there are no pending write operations; // otherwise, the read needs to wait on the last write operation to be completed. - if (currentBatch != null) { + if (currentBatch != null && currentBatch.builder.numRecords() > 0) { currentBatch.deferredEvents.add(event); } else { if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 40d059fa3d6ec..a2f25b24a4c93 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentMatcher; +import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.time.Duration; @@ -101,7 +102,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@SuppressWarnings("checkstyle:JavaNCSS") +@SuppressWarnings({"checkstyle:JavaNCSS", "checkstyle:ClassDataAbstractionCoupling"}) public class CoordinatorRuntimeTest { private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5); @@ -120,6 +121,34 @@ public byte[] serializeValue(String record) { } } + private static class ThrowingSerializer implements Serializer { + private final Serializer serializer; + private boolean throwOnNextOperation; + + public ThrowingSerializer(Serializer serializer) { + this.serializer = serializer; + this.throwOnNextOperation = false; + } + + public void throwOnNextOperation() { + throwOnNextOperation = true; + } + + @Override + public byte[] serializeKey(T record) { + return serializer.serializeKey(record); + } + + @Override + public byte[] serializeValue(T record) { + if (throwOnNextOperation) { + throwOnNextOperation = false; + throw new BufferOverflowException(); + } + return serializer.serializeValue(record); + } + } + /** * A CoordinatorEventProcessor that directly executes the operations. This is * useful in unit tests where execution in threads is not required. @@ -270,6 +299,10 @@ public long append( if (batch.sizeInBytes() > config(tp).maxMessageSize()) throw new RecordTooLargeException("Batch is larger than the max message size"); + // We don't want the coordinator to write empty batches. + if (batch.validBytes() <= 0) + throw new KafkaException("Coordinator tried to write an empty batch"); + if (writeCount.incrementAndGet() > maxWrites) throw new KafkaException("Maximum number of writes reached"); @@ -4213,6 +4246,73 @@ public void testScheduleNonAtomicWriteOperationWhenWriteFails() { assertEquals(Collections.emptyList(), writer.entries(TP)); } + @Test + public void testEmptyBatch() throws Exception { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + ThrowingSerializer serializer = new ThrowingSerializer(new StringSerializer()); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Write #1, which fails. + serializer.throwOnNextOperation(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("1"), "response1")); + + // Write #1 should fail and leave an empty batch. + assertFutureThrows(write1, BufferOverflowException.class); + assertNotNull(ctx.currentBatch); + + // Write #2, with no records. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.emptyList(), "response2")); + + // Write #2 should not be attached to the empty batch. + assertTrue(write2.isDone()); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + + // Complete transaction #1. It will flush the current empty batch. + // The coordinator must not try to write an empty batch, otherwise the mock partition writer + // will throw an exception. + CompletableFuture complete1 = runtime.scheduleTransactionCompletion( + "complete#1", + TP, + 100L, + (short) 50, + 10, + TransactionResult.COMMIT, + DEFAULT_WRITE_TIMEOUT + ); + + // Verify that the completion is not committed yet. + assertFalse(complete1.isDone()); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertNull(complete1.get(5, TimeUnit.SECONDS)); + } + @Test public void testRecordFlushTime() throws Exception { MockTimer timer = new MockTimer();