Skip to content

Commit

Permalink
KAFKA-17482: Make share partition initialization async (KIP-932) (#17097
Browse files Browse the repository at this point in the history
)

The PR introduces states for share partition which are used to make share partition initilzation async.

Reviewers:  Andrew Schofield <[email protected]>, Manikumar Reddy <[email protected]>
  • Loading branch information
apoorvmittal10 authored Sep 9, 2024
1 parent 049b7cd commit 4ac1dd4
Show file tree
Hide file tree
Showing 4 changed files with 643 additions and 159 deletions.
258 changes: 182 additions & 76 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
Expand All @@ -38,7 +39,6 @@
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.group.share.PersisterStateBatch;
import org.apache.kafka.server.group.share.ReadShareGroupStateParameters;
import org.apache.kafka.server.group.share.ReadShareGroupStateResult;
import org.apache.kafka.server.group.share.TopicData;
import org.apache.kafka.server.group.share.WriteShareGroupStateParameters;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
Expand All @@ -59,7 +59,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -78,6 +77,31 @@ public class SharePartition {
*/
static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();

/**
* The SharePartitionState is used to track the state of the share partition. The state of the
* share partition determines if the partition is ready to receive requests, be initialized with
* persisted state, or has failed to initialize.
*/
// Visible for testing
enum SharePartitionState {
/**
* The share partition is empty and has not been initialized with persisted state.
*/
EMPTY,
/**
* The share partition is initializing with persisted state.
*/
INITIALIZING,
/**
* The share partition is active and ready to serve requests.
*/
ACTIVE,
/**
* The share partition failed to initialize with persisted state.
*/
FAILED
}

/**
* The RecordState is used to track the state of a record that has been fetched from the leader.
* The state of the records determines if the records should be re-delivered, move the next fetch
Expand Down Expand Up @@ -232,6 +256,11 @@ public static RecordState forId(byte id) {
*/
private int stateEpoch;

/**
* The partition state is used to track the state of the share partition.
*/
private SharePartitionState partitionState;

SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
Expand All @@ -254,8 +283,126 @@ public static RecordState forId(byte id) {
this.timer = timer;
this.time = time;
this.persister = persister;
// Initialize the partition.
initialize();
this.partitionState = SharePartitionState.EMPTY;
}

/**
* May initialize the share partition by reading the state from the persister. The share partition
* is initialized only if the state is in the EMPTY state. If the share partition is in ACTIVE state,
* the method completes the future successfully. For other states, the method completes the future
* with exception, which might be re-triable.
*
* @return The method returns a future which is completed when the share partition is initialized
* or completes with an exception if the share partition is in non-initializable state.
*/
public CompletableFuture<Void> maybeInitialize() {
log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition);
CompletableFuture<Void> future = new CompletableFuture<>();
// Check if the share partition is already initialized.
maybeCompleteInitialization(future);
if (future.isDone()) {
return future;
}

// All the pending requests should wait to get completed before the share partition is initialized.
// Attain lock to avoid any concurrent requests to be processed.
lock.writeLock().lock();
try {
// Re-check the state to verify if previous requests has already initialized the share partition.
maybeCompleteInitialization(future);
if (future.isDone()) {
return future;
}

// Update state to initializing to avoid any concurrent requests to be processed.
partitionState = SharePartitionState.INITIALIZING;
// Initialize the share partition by reading the state from the persister.
persister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0)))))
.build())
.build()
).whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception);
completeInitializationWithException(future, exception);
return;
}

if (result == null || result.topicsData() == null || result.topicsData().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.",
groupId, topicIdPartition, result);
completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)));
return;
}

TopicData<PartitionAllData> state = result.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.",
groupId, topicIdPartition, result);
completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)));
return;
}

PartitionAllData partitionData = state.partitions().get(0);
if (partitionData.partition() != topicIdPartition.partition()) {
log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.",
groupId, topicIdPartition, partitionData);
completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)));
return;
}

if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
completeInitializationWithException(future, ex);
return;
}

// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
stateEpoch = partitionData.stateEpoch();

List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
for (PersisterStateBatch stateBatch : stateBatches) {
if (stateBatch.firstOffset() < startOffset) {
log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}"
+ " is less than the start offset: {}.", groupId, topicIdPartition,
stateBatch.firstOffset(), startOffset);
completeInitializationWithException(future, new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)));
return;
}
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
}
// Update the endOffset of the partition.
if (!cachedState.isEmpty()) {
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
endOffset = cachedState.lastEntry().getValue().lastOffset();
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
endOffset = partitionData.startOffset();
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
future.complete(null);
});
} catch (Exception e) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
completeInitializationWithException(future, e);
} finally {
lock.writeLock().unlock();
}

return future;
}

/**
Expand Down Expand Up @@ -858,79 +1005,28 @@ void releaseFetchLock() {
fetchLock.set(false);
}

private void initialize() {
log.debug("Initializing share partition: {}-{}", groupId, topicIdPartition);
// Initialize the share partition by reading the state from the persister.
ReadShareGroupStateResult response;
try {
response = persister.readState(new ReadShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0)))))
.build())
.build()
).get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, e);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition), e);
}

if (response == null || response.topicsData() == null || response.topicsData().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
}

TopicData<PartitionAllData> state = response.topicsData().get(0);
if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) {
log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.",
groupId, topicIdPartition, response);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
}

PartitionAllData partitionData = state.partitions().get(0);
if (partitionData.partition() != topicIdPartition.partition()) {
log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.",
groupId, topicIdPartition, partitionData);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s",
groupId, topicIdPartition));
}

if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
throw ex;
}

// Set the state epoch and end offset from the persisted state.
startOffset = partitionData.startOffset() != -1 ? partitionData.startOffset() : 0;
stateEpoch = partitionData.stateEpoch();
private void completeInitializationWithException(CompletableFuture<Void> future, Throwable exception) {
partitionState = SharePartitionState.FAILED;
future.completeExceptionally(exception);
}

List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
for (PersisterStateBatch stateBatch : stateBatches) {
if (stateBatch.firstOffset() < startOffset) {
log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}"
+ " is less than the start offset: {}.", groupId, topicIdPartition,
stateBatch.firstOffset(), startOffset);
throw new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
}
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
}
// Update the endOffset of the partition.
if (!cachedState.isEmpty()) {
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
endOffset = cachedState.lastEntry().getValue().lastOffset();
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
endOffset = partitionData.startOffset();
private void maybeCompleteInitialization(CompletableFuture<Void> future) {
SharePartitionState currentState = partitionState();
switch (currentState) {
case ACTIVE:
future.complete(null);
return;
case FAILED:
future.completeExceptionally(new IllegalStateException(String.format("Share partition failed to load %s-%s", groupId, topicIdPartition)));
return;
case INITIALIZING:
future.completeExceptionally(new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)));
return;
case EMPTY:
// Do not complete the future as the share partition is not yet initialized.
break;
default:
throw new IllegalStateException("Unknown share partition state: " + currentState);
}
}

Expand Down Expand Up @@ -1352,6 +1448,16 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}

// Visible for testing
SharePartitionState partitionState() {
lock.readLock().lock();
try {
return partitionState;
} finally {
lock.readLock().unlock();
}
}

// Visible for testing
void rollbackOrProcessStateUpdates(
CompletableFuture<Void> future,
Expand Down
Loading

0 comments on commit 4ac1dd4

Please sign in to comment.