From 638844f833b165d6f9ca52c173858d26b7254fac Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 12 Jun 2024 08:29:50 +0200 Subject: [PATCH] KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215) This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms. Reviewers: Jeff Kim , Justine Olshan --- .../common/record/MemoryRecordsBuilder.java | 12 + .../scala/kafka/server/BrokerServer.scala | 1 + .../main/scala/kafka/server/KafkaConfig.scala | 2 + .../group/GroupCoordinatorConfig.java | 12 + .../group/GroupCoordinatorService.java | 2 +- .../group/runtime/CoordinatorRuntime.java | 672 ++++++++++++---- .../group/GroupCoordinatorConfigTest.java | 3 + .../group/GroupCoordinatorServiceTest.java | 1 + .../group/runtime/CoordinatorRuntimeTest.java | 736 +++++++++++++++++- .../kafka/server/util/timer/TimerTask.java | 4 + 10 files changed, 1263 insertions(+), 182 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 70f279a6c29bc..b37b1f1ca6851 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -870,6 +870,18 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head return this.writeLimit >= estimatedBytesWritten() + recordSize; } + /** + * Check if we have room for a given number of bytes. + */ + public boolean hasRoomFor(int estimatedRecordsSize) { + if (isFull()) return false; + return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize; + } + + public int maxAllowedBytes() { + return this.writeLimit - this.batchHeaderSizeInBytes; + } + public boolean isClosed() { return builtRecords != null; } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 31db58c077889..7e225440abf61 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -570,6 +570,7 @@ class BrokerServer( val serde = new CoordinatorRecordSerde val groupCoordinatorConfig = new GroupCoordinatorConfig( config.groupCoordinatorNumThreads, + config.groupCoordinatorAppendLingerMs, config.consumerGroupSessionTimeoutMs, config.consumerGroupHeartbeatIntervalMs, config.consumerGroupMaxSize, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ad8635f7ce780..db96bcb6762cb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -278,6 +278,7 @@ object KafkaConfig { .define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC) + .define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) // Internal configuration used by integration and system tests. .defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC) @@ -965,6 +966,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) || groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER) val groupCoordinatorNumThreads = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG) + val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG) /** Consumer group configs */ val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index 84219aa46ffd0..cc86b6cf818e3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -57,6 +57,10 @@ public class GroupCoordinatorConfig { Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + "The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); + public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; + public final static String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + + "wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated."; + public final static int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 10; public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads"; public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator."; @@ -164,6 +168,12 @@ public class GroupCoordinatorConfig { */ public final int numThreads; + /** + * The duration in milliseconds that the coordinator will wait for writes to + * accumulate before flushing them to disk. + */ + public final int appendLingerMs; + /** * The consumer group session timeout in milliseconds. */ @@ -259,6 +269,7 @@ public class GroupCoordinatorConfig { public GroupCoordinatorConfig( int numThreads, + int appendLingerMs, int consumerGroupSessionTimeoutMs, int consumerGroupHeartbeatIntervalMs, int consumerGroupMaxSize, @@ -277,6 +288,7 @@ public GroupCoordinatorConfig( CompressionType compressionType ) { this.numThreads = numThreads; + this.appendLingerMs = appendLingerMs; this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs; this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; this.consumerGroupMaxSize = consumerGroupMaxSize; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 5e4e899faa681..f92594f09d07e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -182,12 +182,12 @@ public GroupCoordinatorService build() { .withPartitionWriter(writer) .withLoader(loader) .withCoordinatorShardBuilderSupplier(supplier) - .withTime(time) .withDefaultWriteTimeOut(Duration.ofMillis(config.offsetCommitTimeoutMs)) .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics) .withCoordinatorMetrics(groupCoordinatorMetrics) .withSerializer(new CoordinatorRecordSerde()) .withCompression(Compression.of(config.compressionType).build()) + .withAppendLingerMs(config.appendLingerMs) .build(); return new GroupCoordinatorService( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 4207c94770b96..57927867c89ef 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -24,11 +24,13 @@ import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.BufferSupplier; @@ -50,10 +52,12 @@ import java.nio.ByteBuffer; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; @@ -66,8 +70,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.kafka.common.record.Record.EMPTY_HEADERS; - /** * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator * or the transaction coordinator. @@ -115,6 +117,7 @@ public static class Builder, U> { private CoordinatorMetrics coordinatorMetrics; private Serializer serializer; private Compression compression; + private int appendLingerMs; public Builder withLogPrefix(String logPrefix) { this.logPrefix = logPrefix; @@ -181,6 +184,11 @@ public Builder withCompression(Compression compression) { return this; } + public Builder withAppendLingerMs(int appendLingerMs) { + this.appendLingerMs = appendLingerMs; + return this; + } + public CoordinatorRuntime build() { if (logPrefix == null) logPrefix = ""; @@ -206,6 +214,8 @@ public CoordinatorRuntime build() { throw new IllegalArgumentException("Serializer must be set."); if (compression == null) compression = Compression.NONE; + if (appendLingerMs < 0) + throw new IllegalArgumentException("AppendLinger must be >= 0"); return new CoordinatorRuntime<>( logPrefix, @@ -220,7 +230,8 @@ public CoordinatorRuntime build() { runtimeMetrics, coordinatorMetrics, serializer, - compression + compression, + appendLingerMs ); } } @@ -275,7 +286,7 @@ boolean canTransitionFrom(CoordinatorState state) { FAILED { @Override boolean canTransitionFrom(CoordinatorState state) { - return state == LOADING; + return state == LOADING || state == ACTIVE; } }; @@ -434,6 +445,81 @@ public int size() { } } + /** + * A simple container class to hold all the attributes + * related to a pending batch. + */ + private static class CoordinatorBatch { + /** + * The base (or first) offset of the batch. If the batch fails + * for any reason, the state machines is rolled back to it. + */ + final long baseOffset; + + /** + * The time at which the batch was created. + */ + final long appendTimeMs; + + /** + * The max batch size. + */ + final int maxBatchSize; + + /** + * The verification guard associated to the batch if it is + * transactional. + */ + final VerificationGuard verificationGuard; + + /** + * The byte buffer backing the records builder. + */ + final ByteBuffer buffer; + + /** + * The records builder. + */ + final MemoryRecordsBuilder builder; + + /** + * The timer used to enfore the append linger time if + * it is non-zero. + */ + final Optional lingerTimeoutTask; + + /** + * The list of deferred events associated with the batch. + */ + final List deferredEvents; + + /** + * The next offset. This is updated when records + * are added to the batch. + */ + long nextOffset; + + CoordinatorBatch( + long baseOffset, + long appendTimeMs, + int maxBatchSize, + VerificationGuard verificationGuard, + ByteBuffer buffer, + MemoryRecordsBuilder builder, + Optional lingerTimeoutTask + ) { + this.baseOffset = baseOffset; + this.nextOffset = baseOffset; + this.appendTimeMs = appendTimeMs; + this.maxBatchSize = maxBatchSize; + this.verificationGuard = verificationGuard; + this.buffer = buffer; + this.builder = builder; + this.lingerTimeoutTask = lingerTimeoutTask; + this.deferredEvents = new ArrayList<>(); + } + } + /** * CoordinatorContext holds all the metadata around a coordinator state machine. */ @@ -493,6 +579,11 @@ class CoordinatorContext { */ BufferSupplier bufferSupplier; + /** + * The current (or pending) batch. + */ + CoordinatorBatch currentBatch; + /** * Constructor. * @@ -547,6 +638,7 @@ private void transitionTo( .build(), tp ); + load(); break; case ACTIVE: @@ -573,6 +665,46 @@ private void transitionTo( runtimeMetrics.recordPartitionStateChange(oldState, state); } + /** + * Loads the coordinator. + */ + private void load() { + if (state != CoordinatorState.LOADING) { + throw new IllegalStateException("Coordinator must be in loading state"); + } + + loader.load(tp, coordinator).whenComplete((summary, exception) -> { + scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + epoch + ")", tp, () -> { + CoordinatorContext context = coordinators.get(tp); + if (context != null) { + if (context.state != CoordinatorState.LOADING) { + log.info("Ignored load completion from {} because context is in {} state.", + context.tp, context.state); + return; + } + try { + if (exception != null) throw exception; + context.transitionTo(CoordinatorState.ACTIVE); + if (summary != null) { + runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs()); + log.info("Finished loading of metadata from {} with epoch {} in {}ms where {}ms " + + "was spent in the scheduler. Loaded {} records which total to {} bytes.", + tp, epoch, summary.endTimeMs() - summary.startTimeMs(), + summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes()); + } + } catch (Throwable ex) { + log.error("Failed to load metadata from {} with epoch {} due to {}.", + tp, epoch, ex.toString()); + context.transitionTo(CoordinatorState.FAILED); + } + } else { + log.debug("Failed to complete the loading of metadata for {} in epoch {} since the coordinator does not exist.", + tp, epoch); + } + }); + }); + } + /** * Unloads the coordinator. */ @@ -583,11 +715,352 @@ private void unload() { } timer.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); + failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { coordinator.onUnloaded(); } coordinator = null; } + + /** + * Frees the current batch. + */ + private void freeCurrentBatch() { + // Cancel the linger timeout. + currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); + + // Release the buffer. + bufferSupplier.release(currentBatch.buffer); + + currentBatch = null; + } + + /** + * Flushes the current (or pending) batch to the log. When the batch is written + * locally, a new snapshot is created in the snapshot registry and the events + * associated with the batch are added to the deferred event queue. + */ + private void flushCurrentBatch() { + if (currentBatch != null) { + try { + // Write the records to the log and update the last written offset. + long offset = partitionWriter.append( + tp, + currentBatch.verificationGuard, + currentBatch.builder.build() + ); + coordinator.updateLastWrittenOffset(offset); + + if (offset != currentBatch.nextOffset) { + log.error("The state machine of the coordinator {} is out of sync with the underlying log. " + + "The last written offset returned is {} while the coordinator expected {}. The coordinator " + + "will be reloaded in order to re-synchronize the state machine.", + tp, offset, currentBatch.nextOffset); + // Transition to FAILED state to unload the state machine and complete + // exceptionally all the pending operations. + transitionTo(CoordinatorState.FAILED); + // Transition to LOADING to trigger the restoration of the state. + transitionTo(CoordinatorState.LOADING); + // Thrown NotCoordinatorException to fail the operation that + // triggered the write. We use NotCoordinatorException to be + // consistent with the transition to FAILED. + throw Errors.NOT_COORDINATOR.exception(); + } + + // Add all the pending deferred events to the deferred event queue. + for (DeferredEvent event : currentBatch.deferredEvents) { + deferredEventQueue.add(offset, event); + } + + // Free up the current batch. + freeCurrentBatch(); + } catch (Throwable t) { + log.error("Writing records to {} failed due to: {}.", tp, t.getMessage()); + failCurrentBatch(t); + // We rethrow the exception for the caller to handle it too. + throw t; + } + } + } + + /** + * Flushes the current batch if it is transactional or if it has passed the append linger time. + */ + private void maybeFlushCurrentBatch(long currentTimeMs) { + if (currentBatch != null) { + if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs) { + flushCurrentBatch(); + } + } + } + + /** + * Fails the current batch, reverts to the snapshot to the base/start offset of the + * batch, fails all the associated events. + */ + private void failCurrentBatch(Throwable t) { + if (currentBatch != null) { + coordinator.revertLastWrittenOffset(currentBatch.baseOffset); + for (DeferredEvent event : currentBatch.deferredEvents) { + event.complete(t); + } + freeCurrentBatch(); + } + } + + /** + * Allocates a new batch if none already exists. + */ + private void maybeAllocateNewBatch( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + long currentTimeMs + ) { + if (currentBatch == null) { + LogConfig logConfig = partitionWriter.config(tp); + byte magic = logConfig.recordVersion().value; + int maxBatchSize = logConfig.maxMessageSize(); + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( + buffer, + magic, + compression, + TimestampType.CREATE_TIME, + 0L, + currentTimeMs, + producerId, + producerEpoch, + 0, + producerId != RecordBatch.NO_PRODUCER_ID, + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize + ); + + Optional lingerTimeoutTask = Optional.empty(); + if (appendLingerMs > 0) { + lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) { + @Override + public void run() { + // An event to flush the batch is pushed to the front of the queue + // to ensure that the linger time is respected. + enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> { + if (this.isCancelled()) return; + withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch); + })); + } + }); + CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get()); + } + + currentBatch = new CoordinatorBatch( + prevLastWrittenOffset, + currentTimeMs, + maxBatchSize, + verificationGuard, + buffer, + builder, + lingerTimeoutTask + ); + } + } + + /** + * Appends records to the log and replay them to the state machine. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param verificationGuard The verification guard. + * @param records The records to append. + * @param replay A boolean indicating whether the records + * must be replayed or not. + * @param event The event that must be completed when the + * records are written. + */ + private void append( + long producerId, + short producerEpoch, + VerificationGuard verificationGuard, + List records, + boolean replay, + DeferredEvent event + ) { + if (state != CoordinatorState.ACTIVE) { + throw new IllegalStateException("Coordinator must be active to append records"); + } + + if (records.isEmpty()) { + // If the records are empty, it was a read operation after all. In this case, + // the response can be returned directly iff there are no pending write operations; + // otherwise, the read needs to wait on the last write operation to be completed. + if (currentBatch != null) { + currentBatch.deferredEvents.add(event); + } else { + OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); + if (pendingOffset.isPresent()) { + deferredEventQueue.add(pendingOffset.getAsLong(), event); + } else { + event.complete(null); + } + } + } else { + // If the records are not empty, first, they are applied to the state machine, + // second, they are appended to the opened batch. + long currentTimeMs = time.milliseconds(); + + // If the current write operation is transactional, the current batch + // is written before proceeding with it. + if (producerId != RecordBatch.NO_PRODUCER_ID) { + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + flushCurrentBatch(); + } + + // Allocate a new batch if none exists. + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + + // Prepare the records. + List recordsToAppend = new ArrayList<>(records.size()); + for (U record : records) { + recordsToAppend.add(new SimpleRecord( + currentTimeMs, + serializer.serializeKey(record), + serializer.serializeValue(record) + )); + } + + // Compute the estimated size of the records. + int estimatedSize = AbstractRecords.estimateSizeInBytes( + currentBatch.builder.magic(), + compression.type(), + recordsToAppend + ); + + // Check if the current batch has enough space. We check is before + // replaying the records in order to avoid having to revert back + // changes if the records do not fit within a batch. + if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { + throw new RecordTooLargeException("Message batch size is " + estimatedSize + + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + currentBatch.maxBatchSize + "."); + } + + if (!currentBatch.builder.hasRoomFor(estimatedSize)) { + // Otherwise, we write the current batch, allocate a new one and re-verify + // whether the records fit in it. + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + flushCurrentBatch(); + maybeAllocateNewBatch( + producerId, + producerEpoch, + verificationGuard, + currentTimeMs + ); + } + + // Add the event to the list of pending events associated with the batch. + currentBatch.deferredEvents.add(event); + + try { + // Apply record to the state machine. + if (replay) { + for (int i = 0; i < records.size(); i++) { + // We compute the offset of the record based on the last written offset. The + // coordinator is the single writer to the underlying partition so we can + // deduce it like this. + coordinator.replay( + currentBatch.nextOffset + i, + producerId, + producerEpoch, + records.get(i) + ); + } + } + + // Append to the batch. + for (SimpleRecord record : recordsToAppend) { + currentBatch.builder.append(record); + currentBatch.nextOffset++; + } + } catch (Throwable t) { + log.error("Replaying records to {} failed due to: {}.", tp, t.getMessage()); + // If an exception is thrown, we fail the entire batch. Exceptions should be + // really exceptional in this code path and they would usually be the results + // of bugs preventing records to be replayed. + failCurrentBatch(t); + } + + // Write the current batch if it is transactional or if the linger timeout + // has expired. + // If flushing fails, we don't catch the exception in order to let + // the caller fail the current operation. + maybeFlushCurrentBatch(currentTimeMs); + } + } + + /** + * Completes a transaction. + * + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param coordinatorEpoch The coordinator epoch of the transaction coordinator. + * @param result The transaction result. + * @param event The event that must be completed when the + * control record is written. + */ + private void completeTransaction( + long producerId, + short producerEpoch, + int coordinatorEpoch, + TransactionResult result, + DeferredEvent event + ) { + if (state != CoordinatorState.ACTIVE) { + throw new IllegalStateException("Coordinator must be active to complete a transaction"); + } + + // The current batch must be written before the transaction marker is written + // in order to respect the order. + flushCurrentBatch(); + + long prevLastWrittenOffset = coordinator.lastWrittenOffset(); + try { + coordinator.replayEndTransactionMarker( + producerId, + producerEpoch, + result + ); + + long offset = partitionWriter.append( + tp, + VerificationGuard.SENTINEL, + MemoryRecords.withEndTransactionMarker( + time.milliseconds(), + producerId, + producerEpoch, + new EndTransactionMarker( + result == TransactionResult.COMMIT ? ControlRecordType.COMMIT : ControlRecordType.ABORT, + coordinatorEpoch + ) + ) + ); + coordinator.updateLastWrittenOffset(offset); + + deferredEventQueue.add(offset, event); + } catch (Throwable t) { + coordinator.revertLastWrittenOffset(prevLastWrittenOffset); + event.complete(t); + } + } } class OperationTimeout extends TimerTask { @@ -781,100 +1254,20 @@ public void run() { // Execute the operation. result = op.generateRecordsAndResult(context.coordinator.coordinator()); - if (result.records().isEmpty()) { - // If the records are empty, it was a read operation after all. In this case, - // the response can be returned directly iff there are no pending write operations; - // otherwise, the read needs to wait on the last write operation to be completed. - OptionalLong pendingOffset = context.deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - context.deferredEventQueue.add(pendingOffset.getAsLong(), this); - } else { - complete(null); - } - } else { - // If the records are not empty, first, they are applied to the state machine, - // second, then are written to the partition/log, and finally, the response - // is put into the deferred event queue. - long prevLastWrittenOffset = context.coordinator.lastWrittenOffset(); - LogConfig logConfig = partitionWriter.config(tp); - byte magic = logConfig.recordVersion().value; - int maxBatchSize = logConfig.maxMessageSize(); - long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); - - try { - MemoryRecordsBuilder builder = new MemoryRecordsBuilder( - buffer, - magic, - compression, - TimestampType.CREATE_TIME, - 0L, - currentTimeMs, - producerId, - producerEpoch, - 0, - producerId != RecordBatch.NO_PRODUCER_ID, - false, - RecordBatch.NO_PARTITION_LEADER_EPOCH, - maxBatchSize - ); - - // Apply the records to the state machine and add them to the batch. - for (int i = 0; i < result.records().size(); i++) { - U record = result.records().get(i); - - if (result.replayRecords()) { - // We compute the offset of the record based on the last written offset. The - // coordinator is the single writer to the underlying partition so we can - // deduce it like this. - context.coordinator.replay( - prevLastWrittenOffset + i, - producerId, - producerEpoch, - record - ); - } - - byte[] keyBytes = serializer.serializeKey(record); - byte[] valBytes = serializer.serializeValue(record); - - if (builder.hasRoomFor(currentTimeMs, keyBytes, valBytes, EMPTY_HEADERS)) { - builder.append( - currentTimeMs, - keyBytes, - valBytes, - EMPTY_HEADERS - ); - } else { - throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition " + tp + " which exceeds the maximum " + - "configured size of " + maxBatchSize + "."); - } - } - - // Write the records to the log and update the last written - // offset. - long offset = partitionWriter.append( - tp, - verificationGuard, - builder.build() - ); - context.coordinator.updateLastWrittenOffset(offset); + // Append the records and replay them to the state machine. + context.append( + producerId, + producerEpoch, + verificationGuard, + result.records(), + result.replayRecords(), + this + ); - // Add the response to the deferred queue. - if (!future.isDone()) { - context.deferredEventQueue.add(offset, this); - operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis()); - timer.add(operationTimeout); - } else { - complete(null); - } - } catch (Throwable t) { - context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset); - complete(t); - } finally { - context.bufferSupplier.release(buffer); - } + // If the operation is not done, create an operation timeout. + if (!future.isDone()) { + operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis()); + timer.add(operationTimeout); } }); } catch (Throwable t) { @@ -1142,40 +1535,17 @@ public TopicPartition key() { public void run() { try { withActiveContextOrThrow(tp, context -> { - long prevLastWrittenOffset = context.coordinator.lastWrittenOffset(); + context.completeTransaction( + producerId, + producerEpoch, + coordinatorEpoch, + result, + this + ); - try { - context.coordinator.replayEndTransactionMarker( - producerId, - producerEpoch, - result - ); - - long offset = partitionWriter.append( - tp, - VerificationGuard.SENTINEL, - MemoryRecords.withEndTransactionMarker( - time.milliseconds(), - producerId, - producerEpoch, - new EndTransactionMarker( - result == TransactionResult.COMMIT ? ControlRecordType.COMMIT : ControlRecordType.ABORT, - coordinatorEpoch - ) - ) - ); - context.coordinator.updateLastWrittenOffset(offset); - - if (!future.isDone()) { - context.deferredEventQueue.add(offset, this); - operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis()); - timer.add(operationTimeout); - } else { - complete(null); - } - } catch (Throwable t) { - context.coordinator.revertLastWrittenOffset(prevLastWrittenOffset); - complete(t); + if (!future.isDone()) { + operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis()); + timer.add(operationTimeout); } }); } catch (Throwable t) { @@ -1449,6 +1819,12 @@ public void onHighWatermarkUpdated( */ private final Compression compression; + /** + * The duration in milliseconds that the coordinator will wait for writes to + * accumulate before flushing them to disk. + */ + private final int appendLingerMs; + /** * Atomic boolean indicating whether the runtime is running. */ @@ -1475,7 +1851,9 @@ public void onHighWatermarkUpdated( * @param coordinatorMetrics The coordinator metrics. * @param serializer The serializer. * @param compression The compression codec. + * @param appendLingerMs The append linger time in ms. */ + @SuppressWarnings("checkstyle:ParameterNumber") private CoordinatorRuntime( String logPrefix, LogContext logContext, @@ -1489,7 +1867,8 @@ private CoordinatorRuntime( CoordinatorRuntimeMetrics runtimeMetrics, CoordinatorMetrics coordinatorMetrics, Serializer serializer, - Compression compression + Compression compression, + int appendLingerMs ) { this.logPrefix = logPrefix; this.logContext = logContext; @@ -1506,6 +1885,7 @@ private CoordinatorRuntime( this.coordinatorMetrics = coordinatorMetrics; this.serializer = serializer; this.compression = compression; + this.appendLingerMs = appendLingerMs; } /** @@ -1836,36 +2216,6 @@ public void scheduleLoadOperation( case FAILED: case INITIAL: context.transitionTo(CoordinatorState.LOADING); - loader.load(tp, context.coordinator).whenComplete((summary, exception) -> { - scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { - CoordinatorContext ctx = coordinators.get(tp); - if (ctx != null) { - if (ctx.state != CoordinatorState.LOADING) { - log.info("Ignored load completion from {} because context is in {} state.", - ctx.tp, ctx.state); - return; - } - try { - if (exception != null) throw exception; - ctx.transitionTo(CoordinatorState.ACTIVE); - if (summary != null) { - runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs()); - log.info("Finished loading of metadata from {} with epoch {} in {}ms where {}ms " + - "was spent in the scheduler. Loaded {} records which total to {} bytes.", - tp, partitionEpoch, summary.endTimeMs() - summary.startTimeMs(), - summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes()); - } - } catch (Throwable ex) { - log.error("Failed to load metadata from {} with epoch {} due to {}.", - tp, partitionEpoch, ex.toString()); - ctx.transitionTo(CoordinatorState.FAILED); - } - } else { - log.debug("Failed to complete the loading of metadata for {} in epoch {} since the coordinator does not exist.", - tp, partitionEpoch); - } - }); - }); break; case LOADING: diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 03306c9040743..b65ceda74ed0b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -30,6 +30,7 @@ public class GroupCoordinatorConfigTest { public void testConfigs() { ConsumerGroupPartitionAssignor assignor = new RangeAssignor(); GroupCoordinatorConfig config = new GroupCoordinatorConfig( + 10, 10, 30, 10, @@ -65,6 +66,7 @@ public void testConfigs() { assertEquals(24 * 60 * 60 * 1000L, config.offsetsRetentionMs); assertEquals(5000, config.offsetCommitTimeoutMs); assertEquals(CompressionType.GZIP, config.compressionType); + assertEquals(10, config.appendLingerMs); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( @@ -74,6 +76,7 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( ) { return new GroupCoordinatorConfig( 1, + 10, 45, 5, Integer.MAX_VALUE, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 6cd96458c647d..7ddb04a6d8b68 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -118,6 +118,7 @@ private CoordinatorRuntime mockRuntime private GroupCoordinatorConfig createConfig() { return new GroupCoordinatorConfig( 1, + 10, 45, 5, Integer.MAX_VALUE, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index ae1d404792406..5052881d3c0ba 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.AbstractRecords; @@ -60,8 +61,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Deque; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -74,6 +75,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.ACTIVE; @@ -85,6 +87,8 @@ import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -147,7 +151,7 @@ public void close() throws Exception {} * when poll() is called. */ private static class ManualEventProcessor implements CoordinatorEventProcessor { - private Deque queue = new LinkedList<>(); + private final Deque queue = new LinkedList<>(); @Override public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { @@ -274,9 +278,72 @@ public long append( * A simple Coordinator implementation that stores the records into a set. */ static class MockCoordinatorShard implements CoordinatorShard { + static class RecordAndMetadata { + public final long offset; + public final long producerId; + public final short producerEpoch; + public final String record; + + public RecordAndMetadata( + long offset, + String record + ) { + this( + offset, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + record + ); + } + + public RecordAndMetadata( + long offset, + long producerId, + short producerEpoch, + String record + ) { + this.offset = offset; + this.producerId = producerId; + this.producerEpoch = producerEpoch; + this.record = record; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RecordAndMetadata that = (RecordAndMetadata) o; + + if (offset != that.offset) return false; + if (producerId != that.producerId) return false; + if (producerEpoch != that.producerEpoch) return false; + return Objects.equals(record, that.record); + } + + @Override + public int hashCode() { + int result = (int) (offset ^ (offset >>> 32)); + result = 31 * result + (int) (producerId ^ (producerId >>> 32)); + result = 31 * result + (int) producerEpoch; + result = 31 * result + (record != null ? record.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "RecordAndMetadata(" + + "offset=" + offset + + ", producerId=" + producerId + + ", producerEpoch=" + producerEpoch + + ", record='" + record.substring(0, 10) + '\'' + + ')'; + } + } + private final SnapshotRegistry snapshotRegistry; - private final TimelineHashSet records; - private final TimelineHashMap> pendingRecords; + private final TimelineHashSet records; + private final TimelineHashMap> pendingRecords; private final CoordinatorTimer timer; MockCoordinatorShard( @@ -296,12 +363,19 @@ public void replay( short producerEpoch, String record ) throws RuntimeException { + RecordAndMetadata recordAndMetadata = new RecordAndMetadata( + offset, + producerId, + producerEpoch, + record + ); + if (producerId == RecordBatch.NO_PRODUCER_ID) { - records.add(record); + records.add(recordAndMetadata); } else { pendingRecords .computeIfAbsent(producerId, __ -> new TimelineHashSet<>(snapshotRegistry, 0)) - .add(record); + .add(recordAndMetadata); } } @@ -312,7 +386,7 @@ public void replayEndTransactionMarker( TransactionResult result ) throws RuntimeException { if (result == TransactionResult.COMMIT) { - TimelineHashSet pending = pendingRecords.remove(producerId); + TimelineHashSet pending = pendingRecords.remove(producerId); if (pending == null) return; records.addAll(pending); } else { @@ -321,13 +395,26 @@ public void replayEndTransactionMarker( } Set pendingRecords(long producerId) { - TimelineHashSet pending = pendingRecords.get(producerId); + TimelineHashSet pending = pendingRecords.get(producerId); if (pending == null) return Collections.emptySet(); - return Collections.unmodifiableSet(new HashSet<>(pending)); + return Collections.unmodifiableSet( + pending.stream().map(record -> record.record).collect(Collectors.toSet()) + ); } Set records() { - return Collections.unmodifiableSet(new HashSet<>(records)); + return Collections.unmodifiableSet( + records.stream().map(record -> record.record).collect(Collectors.toSet()) + ); + } + + List fullRecords() { + return Collections.unmodifiableList( + records + .stream() + .sorted(Comparator.comparingLong(record -> record.offset)) + .collect(Collectors.toList()) + ); } CoordinatorTimer timer() { @@ -407,10 +494,17 @@ private static MemoryRecords records( long timestamp, String... records ) { - if (records.length == 0) + return records(timestamp, Arrays.stream(records).collect(Collectors.toList())); + } + + private static MemoryRecords records( + long timestamp, + List records + ) { + if (records.isEmpty()) return MemoryRecords.EMPTY; - List simpleRecords = Arrays.stream(records).map(record -> + List simpleRecords = records.stream().map(record -> new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset())) ).collect(Collectors.toList()); @@ -447,10 +541,24 @@ private static MemoryRecords transactionalRecords( long timestamp, String... records ) { - if (records.length == 0) + return transactionalRecords( + producerId, + producerEpoch, + timestamp, + Arrays.stream(records).collect(Collectors.toList()) + ); + } + + private static MemoryRecords transactionalRecords( + long producerId, + short producerEpoch, + long timestamp, + List records + ) { + if (records.isEmpty()) return MemoryRecords.EMPTY; - List simpleRecords = Arrays.stream(records).map(record -> + List simpleRecords = records.stream().map(record -> new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset())) ).collect(Collectors.toList()); @@ -986,13 +1094,13 @@ public void testScheduleWriteOp() throws ExecutionException, InterruptedExceptio // Records have been replayed to the coordinator. assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(Collections.singletonList( records(timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); // Write #2. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(Arrays.asList("record3"), "response2")); + state -> new CoordinatorResult<>(Collections.singletonList("record3"), "response2")); // Verify that the write is not committed yet. assertFalse(write2.isDone()); @@ -1540,7 +1648,7 @@ public void testScheduleTransactionCompletion(TransactionResult result) throws E 100L )); // Records have been written to the log. - assertEquals(Arrays.asList( + assertEquals(Collections.singletonList( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); @@ -1785,7 +1893,7 @@ public void replayEndTransactionMarker( assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(Collections.singletonList( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); @@ -1807,7 +1915,7 @@ public void replayEndTransactionMarker( assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords(100L)); assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); - assertEquals(Arrays.asList( + assertEquals(Collections.singletonList( transactionalRecords(100L, (short) 5, timer.time().milliseconds(), "record1", "record2") ), writer.entries(TP)); } @@ -1985,7 +2093,7 @@ public void testScheduleReadAllOp() throws ExecutionException, InterruptedExcept // Read. List>> responses = runtime.scheduleReadAllOperation( "read", - (state, offset) -> new ArrayList<>(state.records) + (state, offset) -> new ArrayList<>(state.records()) ); assertEquals( @@ -3059,6 +3167,594 @@ public void testAppendRecordBatchSize() { assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); } + @Test + public void testScheduleWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with two records. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 2), "response1") + ); + + // Verify that the write is not committed yet. + assertFalse(write1.isDone()); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Arrays.asList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + + // Write #2 with one record. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(2, 3), "response2") + ); + + // Verify that the write is not committed yet. + assertFalse(write2.isDone()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Arrays.asList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + + // Write #3 with one record. This one cannot go into the existing batch + // so the existing batch should be flushed and a new one should be created. + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(3, 4), "response3") + ); + + // Verify that the write is not committed yet. + assertFalse(write3.isDone()); + + // Verify the state. Records are replayed. The previous batch + // got flushed with all the records but the new one from #3. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Arrays.asList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.singletonList( + records(timer.time().milliseconds(), records.subList(0, 3)) + ), writer.entries(TP)); + + // Advance past the linger time. + timer.advanceClock(11); + + // Verify the state. The pending batch is flushed. + assertEquals(4L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Arrays.asList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Arrays.asList( + records(timer.time().milliseconds() - 11, records.subList(0, 3)), + records(timer.time().milliseconds() - 11, records.subList(3, 4)) + ), writer.entries(TP)); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertTrue(write3.isDone()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write all the records. + CompletableFuture write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records, "response1") + ); + + assertFutureThrows(write, RecordTooLargeException.class); + } + + @Test + public void testScheduleWriteOperationWithBatchingWhenWriteFails() { + MockTimer timer = new MockTimer(); + // The partition writer only accept no writes. + MockPartitionWriter writer = new MockPartitionWriter(0); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 1), "response1")); + + // Write #2. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(1, 2), "response2")); + + // Write #3. + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(2, 3), "response3")); + + // Verify the state. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Arrays.asList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + + // Write #4. This write cannot make it in the current batch. So the current batch + // is flushed. It will fail. So we expect all writes to fail. + CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(3, 4), "response4")); + + // Verify the futures. + assertFutureThrows(write1, KafkaException.class); + assertFutureThrows(write2, KafkaException.class); + assertFutureThrows(write3, KafkaException.class); + // Write #4 is also expected to fail. + assertFutureThrows(write4, KafkaException.class); + + // Verify the state. The state should be reverted to the initial state. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + } + + @Test + public void testScheduleWriteOperationWithBatchingWhenReplayFails() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Override the coordinator with a coordinator that throws + // an exception when replay is called. + SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry(); + ctx.coordinator = new SnapshottableCoordinator<>( + new LogContext(), + snapshotRegistry, + new MockCoordinatorShard(snapshotRegistry, ctx.timer) { + @Override + public void replay( + long offset, + long producerId, + short producerEpoch, + String record + ) throws RuntimeException { + if (offset >= 1) { + throw new IllegalArgumentException("error"); + } + super.replay( + offset, + producerId, + producerEpoch, + record + ); + } + }, + TP + ); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. + List records = Stream.of('1', '2').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 1), "response1")); + + // Verify the state. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Collections.singletonList( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + + // Write #2. It should fail. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(1, 2), "response2")); + + // Verify the futures. + assertFutureThrows(write1, IllegalArgumentException.class); + assertFutureThrows(write2, IllegalArgumentException.class); + + // Verify the state. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Collections.emptyList(), ctx.coordinator.coordinator().fullRecords()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + } + + @Test + public void testScheduleTransactionalWriteOperationWithBatching() throws ExecutionException, InterruptedException, TimeoutException { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Write #1 with one record. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.singletonList("record#1"), "response1") + ); + + // Verify that the write is not committed yet. + assertFalse(write1.isDone()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L)); + assertEquals(mkSet("record#1"), ctx.coordinator.coordinator().records()); + assertEquals(Collections.emptyList(), writer.entries(TP)); + + // Transactional write #2 with one record. This will flush the current batch. + CompletableFuture write2 = runtime.scheduleTransactionalWriteOperation( + "txn-write#1", + TP, + "transactional-id", + 100L, + (short) 50, + Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.singletonList("record#2"), "response2"), + TXN_OFFSET_COMMIT_LATEST_VERSION + ); + + // Verify that the write is not committed yet. + assertFalse(write2.isDone()); + + // Verify the state. The current batch and the transactional records are + // written to the log. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record#2"), ctx.coordinator.coordinator().pendingRecords(100L)); + assertEquals(mkSet("record#1"), ctx.coordinator.coordinator().records()); + assertEquals(Arrays.asList( + records(timer.time().milliseconds(), "record#1"), + transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2") + ), writer.entries(TP)); + + // Write #3 with one record. + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.singletonList("record#3"), "response3") + ); + + // Verify that the write is not committed yet. + assertFalse(write3.isDone()); + + // Verify the state. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(mkSet("record#2"), ctx.coordinator.coordinator().pendingRecords(100L)); + assertEquals(mkSet("record#1", "record#3"), ctx.coordinator.coordinator().records()); + assertEquals(Arrays.asList( + records(timer.time().milliseconds(), "record#1"), + transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2") + ), writer.entries(TP)); + + // Complete transaction #1. It will flush the current batch if any. + CompletableFuture complete1 = runtime.scheduleTransactionCompletion( + "complete#1", + TP, + 100L, + (short) 50, + 10, + TransactionResult.COMMIT, + DEFAULT_WRITE_TIMEOUT + ); + + // Verify that the completion is not committed yet. + assertFalse(complete1.isDone()); + + // Verify the state. + assertEquals(4L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().pendingRecords(100L)); + assertEquals(mkSet("record#1", "record#2", "record#3"), ctx.coordinator.coordinator().records()); + assertEquals(Arrays.asList( + records(timer.time().milliseconds(), "record#1"), + transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record#2"), + records(timer.time().milliseconds(), "record#3"), + endTransactionMarker(100L, (short) 50, timer.time().milliseconds(), 10, ControlRecordType.COMMIT) + ), writer.entries(TP)); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertTrue(write3.isDone()); + assertTrue(complete1.isDone()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); + assertNull(complete1.get(5, TimeUnit.SECONDS)); + } + + @Test + public void testStateMachineIsReloadedWhenOutOfSync() { + MockTimer timer = new MockTimer(); + MockCoordinatorLoader loader = spy(new MockCoordinatorLoader()); + MockPartitionWriter writer = new MockPartitionWriter() { + @Override + public long append( + TopicPartition tp, + VerificationGuard verificationGuard, + MemoryRecords batch + ) { + // Add 1 to the returned offsets. + return super.append(tp, verificationGuard, batch) + 1; + } + }; + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(ACTIVE, ctx.state); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Keep a reference to the current coordinator. + SnapshottableCoordinator coordinator = ctx.coordinator; + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 1), "response1")); + + // Write #2. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(1, 2), "response2")); + + // Write #3. + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(2, 3), "response3")); + + // Write #4. This write cannot make it in the current batch. So the current batch + // is flushed. It will fail. So we expect all writes to fail. + CompletableFuture write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(3, 4), "response4")); + + // Verify the futures. + assertFutureThrows(write1, NotCoordinatorException.class); + assertFutureThrows(write2, NotCoordinatorException.class); + assertFutureThrows(write3, NotCoordinatorException.class); + // Write #4 is also expected to fail. + assertFutureThrows(write4, NotCoordinatorException.class); + + // Verify that the state machine was loaded twice. + verify(loader, times(2)).load(eq(TP), any()); + + // Verify that the state is active and that the state machine + // is actually a new one. + assertEquals(ACTIVE, ctx.state); + assertNotEquals(coordinator, ctx.coordinator); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp diff --git a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java index ec6e8b3d783ac..ac58b68ec5ea9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/timer/TimerTask.java @@ -32,6 +32,10 @@ public void cancel() { } } + public boolean isCancelled() { + return timerTaskEntry == null; + } + final void setTimerTaskEntry(TimerTaskEntry entry) { synchronized (this) { // if this timerTask is already held by an existing timer task entry,