Skip to content

Commit

Permalink
KAFKA-18200; Handle empty batches in coordinator runtime (apache#18144)
Browse files Browse the repository at this point in the history
* Avoid attaching empty writes to empty batches.
* Handle flushes of empty batches, which would return a 0 offset otherwise.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
squah-confluent authored Dec 17, 2024
1 parent cd5a7ee commit 18f17ed
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,17 @@ private void freeCurrentBatch() {
private void flushCurrentBatch() {
if (currentBatch != null) {
try {
if (currentBatch.builder.numRecords() == 0) {
// The only way we can get here is if append() has failed in an unexpected
// way and left an empty batch. Try to clean it up.
log.debug("Tried to flush an empty batch for {}.", tp);
// There should not be any deferred events attached to the batch. We fail
// the batch just in case. As a side effect, coordinator state is also
// reverted, but there should be no changes since the batch was empty.
failCurrentBatch(new IllegalStateException("Record batch was empty"));
return;
}

long flushStartMs = time.milliseconds();
// Write the records to the log and update the last written offset.
long offset = partitionWriter.append(
Expand Down Expand Up @@ -926,7 +937,7 @@ private void append(
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null) {
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
Expand Down Expand Up @@ -101,7 +102,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("checkstyle:JavaNCSS")
@SuppressWarnings({"checkstyle:JavaNCSS", "checkstyle:ClassDataAbstractionCoupling"})
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
Expand All @@ -120,6 +121,34 @@ public byte[] serializeValue(String record) {
}
}

private static class ThrowingSerializer<T> implements Serializer<T> {
private final Serializer<T> serializer;
private boolean throwOnNextOperation;

public ThrowingSerializer(Serializer<T> serializer) {
this.serializer = serializer;
this.throwOnNextOperation = false;
}

public void throwOnNextOperation() {
throwOnNextOperation = true;
}

@Override
public byte[] serializeKey(T record) {
return serializer.serializeKey(record);
}

@Override
public byte[] serializeValue(T record) {
if (throwOnNextOperation) {
throwOnNextOperation = false;
throw new BufferOverflowException();
}
return serializer.serializeValue(record);
}
}

/**
* A CoordinatorEventProcessor that directly executes the operations. This is
* useful in unit tests where execution in threads is not required.
Expand Down Expand Up @@ -270,6 +299,10 @@ public long append(
if (batch.sizeInBytes() > config(tp).maxMessageSize())
throw new RecordTooLargeException("Batch is larger than the max message size");

// We don't want the coordinator to write empty batches.
if (batch.validBytes() <= 0)
throw new KafkaException("Coordinator tried to write an empty batch");

if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");

Expand Down Expand Up @@ -4213,6 +4246,73 @@ public void testScheduleNonAtomicWriteOperationWhenWriteFails() {
assertEquals(Collections.emptyList(), writer.entries(TP));
}

@Test
public void testEmptyBatch() throws Exception {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ThrowingSerializer<String> serializer = new ThrowingSerializer<String>(new StringSerializer());

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);

// Write #1, which fails.
serializer.throwOnNextOperation();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("1"), "response1"));

// Write #1 should fail and leave an empty batch.
assertFutureThrows(write1, BufferOverflowException.class);
assertNotNull(ctx.currentBatch);

// Write #2, with no records.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.emptyList(), "response2"));

// Write #2 should not be attached to the empty batch.
assertTrue(write2.isDone());
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));

// Complete transaction #1. It will flush the current empty batch.
// The coordinator must not try to write an empty batch, otherwise the mock partition writer
// will throw an exception.
CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
"complete#1",
TP,
100L,
(short) 50,
10,
TransactionResult.COMMIT,
DEFAULT_WRITE_TIMEOUT
);

// Verify that the completion is not committed yet.
assertFalse(complete1.isDone());

// Commit and verify that writes are completed.
writer.commit(TP);
assertNull(complete1.get(5, TimeUnit.SECONDS));
}

@Test
public void testRecordFlushTime() throws Exception {
MockTimer timer = new MockTimer();
Expand Down

0 comments on commit 18f17ed

Please sign in to comment.