Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16541 Fix potential leader-epoch checkpoint file corruption #15993

Merged
merged 13 commits into from
Jun 6, 2024

Conversation

ocadaruma
Copy link
Contributor

  • A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown
    • i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device
  • To address this problem, this PR makes below changes:
    • Revert LeaderEpochCheckpoint#write to always fsync
    • truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
    • UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent
      • This is to prevent potentially-stale (because of async checkpointing mentioned above) checkpoint file is read and causes epoch entries to become incorrect

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ocadaruma ocadaruma marked this pull request as ready for review May 19, 2024 14:27
@ocadaruma
Copy link
Contributor Author

@junrao Could you take a look?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the PR. Just a few minor comments. Also, could you rebase?

@@ -2095,7 +2101,8 @@ object UnifiedLog extends Logging {
}

/**
* If the recordVersion is >= RecordVersion.V2, then create and return a LeaderEpochFileCache.
* If the recordVersion is >= RecordVersion.V2, then create a new LeaderEpochFileCache instance
* or update current cache if any with the new checkpoint and return it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing it to the following?

"If the recordVersion is >= RecordVersion.V2, create a new LeaderEpochFileCache instance. Loading the epoch entries from the backing checkpoint file or the provided currentCache if not empty."

@@ -42,10 +43,15 @@
* <p>
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
* <p>
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} flushes the epoch-entry changes to checkpoint asynchronously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushes => flush

LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
}

/**
* Instantiates a new LeaderEpochFileCache with replacing checkpoint with given one
* without restoring the cache from the checkpoint, with retaining the current epoch entries.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about following?

"Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file. The provided epoch entries are expected to no less fresher than the checkpoint file."

@@ -313,14 +345,14 @@ public void truncateFromEnd(long endOffset) {
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
List<EpochEntry> removedEntries = removeFromEnd(x -> x.startOffset >= endOffset);

// We intentionally don't force flushing change to the device here because:
// We flush the change to the device in the background because:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to explain in the comment that the reason async flush works is because the stale epochs always have more entries and no missing entries.


/**
* Returns a new LeaderEpochFileCache which contains same
* epoch entries with replacing backing checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpoint => checkpoint file

@ocadaruma
Copy link
Contributor Author

@junrao thanks for the review.

I'll address the comments and rebase.
By the way, I found that current async-write could cause a race with topic deletion. (truncateFromEnd is called and async-write is scheduled => the dir is renamed to -deleted due to topic-deletion => async-flush throws IOException => log-dir is marked as offline unexpectedly)

We need to ignore NoSuchFileException for async-write failure (which is assumed to be safe).

I'll fix that part too.

@ocadaruma ocadaruma requested a review from junrao May 23, 2024 03:34
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for taking care of the race condition with deletes. Just a couple of minor comments.

LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
}

/**
* Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file.
* The provided epoch entries are expected to no less fresher than the checkpoint file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to no less fresher => to be no less fresh

checkpointFile.write(entries, sync);
checkpointFile.write(entries);
} catch (FileNotFoundException | NoSuchFileException e) {
log.warn("Failed to write to checkpoint file {}", file.getAbsolutePath(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add in the warn that "This is ok if the topic/partition is being deleted."

@ocadaruma ocadaruma requested a review from junrao May 23, 2024 22:48
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the updated PR. One more minor comment.

LogContext logContext = new LogContext("[LeaderEpochCache " + topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
}

/**
* Instantiate a new LeaderEpochFileCache with provided epoch entries instead of from the backing checkpoint file.
* The provided epoch entries are expected to no less fresh than the checkpoint file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to no less fresh => to be no less fresh

@ocadaruma ocadaruma requested a review from junrao May 23, 2024 23:15
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the updated PR. The code LGTM. There were 147 failed tests and many of them are related to tiered storage. Are they related to this PR?

@ocadaruma
Copy link
Contributor Author

@junrao Thank you for pointing out.

Some of them are related. I found that RemoteLogManager#getLeaderEpochCheckpoint needs synchronous flush for truncatation.

I added a fix for that.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the updated PR. Left one more comment.

@@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
if (log.leaderEpochCache().isDefined()) {
LeaderEpochFileCache cache = log.leaderEpochCache().get().writeTo(checkpoint);
if (startOffset >= 0) {
cache.truncateFromStart(startOffset);
cache.truncateFromStart(startOffset, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the code a bit hard to understand. I am wondering if we could improve it. The existing usage of InMemoryLeaderEpochCheckpoint is kind of awkward. Its only purpose is to get a list of epoch entries from LEaderEpochCache within a specified offset range. But the way that it achieves the goal is indirect and complicated.

Instead of using InMemoryLeaderEpochCheckpoint, perhaps we could add a new method like List<EpochEntry> epochEntriesInRange(long startOffset, long endOffset) in LeaderEpochCache that returns a list of epoch entries within startOffset and endOffset. Then, we could pull the logic in readAsByteBuffer.InMemoryLeaderEpochCheckpoint() to a static method. This way, we don't need to add the flushSync option in truncateFromStart() and can get rid of InMemoryLeaderEpochCheckpoint. What do you think? cc @satishd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the way that it achieves the goal is indirect and complicated.

Agree. I +1 for adding a method for directly retrieve necessary epoch entries for RLM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thank you for pointing out. I removed InMemoryLeaderEpochCheckpoint (and LeaderEpochCheckpoint interface as well) and refactored the PR based on that.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the updated PR. A few more minor comments.

*
* @param log The actual log from where to take the leader-epoch checkpoint
* @param startOffset The start offset of the checkpoint file (exclusive in the truncation).
* @param startOffset The start offset of the epoch entries (exclusive).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the caller's perspective, the start offset is inclusive and the end offset is exclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, that's right

@@ -42,10 +45,15 @@
* <p>
* Leader Epoch = epoch assigned to each leader by the controller.
* Offset = offset of the first message in each epoch.
* <p>
* Note that {@link #truncateFromStart},{@link #truncateFromEnd} flush the epoch-entry changes to checkpoint asynchronously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps name truncateFromStart and truncateFromEnd to sth like truncateFromStartAsyncFlush and truncateFromEndAsyncFlush to make it clear?

// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do we need to add a trailing - in "leader-epoch-cache-flush-"?

Copy link
Contributor Author

@ocadaruma ocadaruma Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly necessary. I just followed some of existing scheduled tasks's convention (e.g.

scheduler.scheduleOnce(s"load-txns-for-partition-$topicPartition", () => loadTransactions(scheduleStartMs))
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't see the partition after the prefix. So, this is fine then.

@@ -128,15 +162,17 @@ private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
}
}

private List<EpochEntry> removeFromEnd(Predicate<EpochEntry> predicate) {
private static List<EpochEntry> removeFromEnd(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fold this method into the caller since it only has 1 line? Ditto for removeFromStart.


/**
* Returns the leader epoch entries within the range of the given start[exclusive] and end[inclusive] offset
* @param startOffset The start offset of the epoch entries (exclusive).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the caller's perspective, start offset is inclusive and end offset in exclusive.

@ocadaruma ocadaruma requested a review from junrao June 4, 2024 01:07
@ocadaruma
Copy link
Contributor Author

@junrao Thank you for your feedbacks. Addressed the comments.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for the updated PR. The code LGTM. Are the 29 test failures related to this PR?

@ocadaruma
Copy link
Contributor Author

ocadaruma commented Jun 6, 2024

@junrao I don't think they relate.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma : Thanks for triaging the tests. The PR LGTM.

@junrao junrao merged commit 3835515 into apache:trunk Jun 6, 2024
1 check failed
apourchet added a commit to apourchet/kafka that referenced this pull request Jun 6, 2024
commit ee834d9
Author: Antoine Pourchet <[email protected]>
Date:   Thu Jun 6 15:20:48 2024 -0600

    KAFKA-15045: (KIP-924 pt. 19) Update to new AssignmentConfigs (apache#16219)

    This PR updates all of the streams task assignment code to use the new AssignmentConfigs public class.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>

commit 8a2bc3a
Author: Bruno Cadonna <[email protected]>
Date:   Thu Jun 6 21:19:52 2024 +0200

    KAFKA-16903: Consider produce error of different task (apache#16222)

    A task does not know anything about a produce error thrown
    by a different task. That might lead to a InvalidTxnStateException
    when a task attempts to do a transactional operation on a producer
    that failed due to a different task.

    This commit stores the produce exception in the streams producer
    on completion of a send instead of the record collector since the
    record collector is on task level whereas the stream producer
    is on stream thread level. Since all tasks use the same streams
    producer the error should be correctly propagated across tasks
    of the same stream thread.

    For EOS alpha, this commit does not change anything because
    each task uses its own producer. The send error is still
    on task level but so is also the transaction.

    Reviewers: Matthias J. Sax <[email protected]>

commit 7d832cf
Author: David Jacot <[email protected]>
Date:   Thu Jun 6 21:19:20 2024 +0200

    KAFKA-14701; Move `PartitionAssignor` to new `group-coordinator-api` module (apache#16198)

    This patch moves the `PartitionAssignor` interface and all the related classes to a newly created `group-coordinator/api` module, following the pattern used by the storage and tools modules.

    Reviewers: Ritika Reddy <[email protected]>, Jeff Kim <[email protected]>, Chia-Ping Tsai <[email protected]>

commit 79ea7d6
Author: Mickael Maison <[email protected]>
Date:   Thu Jun 6 20:28:39 2024 +0200

    MINOR: Various cleanups in clients (apache#16193)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit a41f7a4
Author: Murali Basani <[email protected]>
Date:   Thu Jun 6 18:06:25 2024 +0200

    KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (apache#16199)

    Reviewers: Greg Harris <[email protected]>, Kamal Chandraprakash <[email protected]>, Chia-Ping Tsai <[email protected]>

commit 0ed104c
Author: Kamal Chandraprakash <[email protected]>
Date:   Thu Jun 6 21:26:08 2024 +0530

    MINOR: Cleanup the storage module unit tests (apache#16202)

    - Use SystemTime instead of MockTime when time is not mocked
    - Use static assertions to reduce the line length
    - Fold the lines if it exceeds the limit
    - rename tp0 to tpId0 when it refers to TopicIdPartition

    Reviewers: Kuan-Po (Cooper) Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>

commit f36a873
Author: Cy <[email protected]>
Date:   Thu Jun 6 08:46:49 2024 -0700

    MINOR: Added test for ClusterConfig#displayTags (apache#16110)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 226f3c5
Author: Sanskar Jhajharia <[email protected]>
Date:   Thu Jun 6 18:48:23 2024 +0530

    MINOR: Code cleanup in metadata module (apache#16065)

    Reviewers: Mickael Maison <[email protected]>

commit ebe1e96
Author: Loïc GREFFIER <[email protected]>
Date:   Thu Jun 6 13:40:31 2024 +0200

    KAFKA-16448: Add ProcessingExceptionHandler interface and implementations (apache#16187)

    This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

    This PR brings ProcessingExceptionHandler interface and default implementations.

    Co-authored-by: Dabz <[email protected]>
    Co-authored-by: sebastienviale <[email protected]>

    Reviewer: Bruno Cadonna <[email protected]>

commit b74b182
Author: Lianet Magrans <[email protected]>
Date:   Thu Jun 6 09:45:36 2024 +0200

    KAFKA-16786: Remove old assignment strategy usage in new consumer (apache#16214)

    Remove usage of the partition.assignment.strategy config in the new consumer. This config is deprecated with the new consumer protocol, so the AsyncKafkaConsumer should not use or validate the property.

    Reviewers: Lucas Brutschy <[email protected]>

commit f880ad6
Author: Alyssa Huang <[email protected]>
Date:   Wed Jun 5 23:30:49 2024 -0700

    KAFKA-16530: Fix high-watermark calculation to not assume the leader is in the voter set (apache#16079)

    1. Changing log message from error to info - We may expect the HW calculation to give us a smaller result than the current HW in the case of quorum reconfiguration. We will continue to not allow the HW to actually decrease.
    2. Logic for finding the updated LeaderEndOffset for updateReplicaState is changed as well. We do not assume the leader is in the voter set and check the observer states as well.
    3. updateLocalState now accepts an additional "lastVoterSet" param which allows us to update the leader state with the last known voters. any nodes in this set but not in voterStates will be added to voterStates and removed from observerStates, any nodes not in this set but in voterStates will be removed from voterStates and added to observerStates

    Reviewers: Luke Chen <[email protected]>, José Armando García Sancio <[email protected]>

commit 3835515
Author: Okada Haruki <[email protected]>
Date:   Thu Jun 6 15:10:13 2024 +0900

    KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (apache#15993)

    A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.

    To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
    (2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
    (3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent

    Reviewers: Jun Rao <[email protected]>

commit 7763243
Author: Florin Akermann <[email protected]>
Date:   Thu Jun 6 00:22:31 2024 +0200

    KAFKA-12317: Update FK-left-join documentation (apache#15689)

    FK left-join was changed via KIP-962. This PR updates the docs accordingly.

    Reviewers: Ayoub Omari <[email protected]>, Matthias J. Sax <[email protected]>

commit 1134520
Author: Ayoub Omari <[email protected]>
Date:   Thu Jun 6 00:05:04 2024 +0200

    KAFKA-16573: Specify node and store where serdes are needed (apache#15790)

    Reviewers: Matthias J. Sax <[email protected]>, Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>

commit 896af1b
Author: Sanskar Jhajharia <[email protected]>
Date:   Thu Jun 6 01:46:59 2024 +0530

    MINOR: Raft module Cleanup (apache#16205)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 0109a3f
Author: Antoine Pourchet <[email protected]>
Date:   Wed Jun 5 14:09:37 2024 -0600

    KAFKA-15045: (KIP-924 pt. 17) State store computation fixed (apache#16194)

    Fixed the calculation of the store name list based on the subtopology being accessed.

    Also added a new test to make sure this new functionality works as intended.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>

commit 52514a8
Author: Greg Harris <[email protected]>
Date:   Wed Jun 5 11:35:32 2024 -0700

    KAFKA-16858: Throw DataException from validateValue on array and map schemas without inner schemas (apache#16161)

    Signed-off-by: Greg Harris <[email protected]>
    Reviewers: Chris Egerton <[email protected]>

commit f2aafcc
Author: Sanskar Jhajharia <[email protected]>
Date:   Wed Jun 5 20:06:01 2024 +0530

    MINOR: Cleanups in Shell Module (apache#16204)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit bd9d68f
Author: Abhijeet Kumar <[email protected]>
Date:   Wed Jun 5 19:12:25 2024 +0530

    KAFKA-15265: Integrate RLMQuotaManager for throttling fetches from remote storage (apache#16071)

    Reviewers: Kamal Chandraprakash<[email protected]>, Luke Chen <[email protected]>, Satish Duggana <[email protected]>

commit 62e5cce
Author: gongxuanzhang <[email protected]>
Date:   Wed Jun 5 18:57:32 2024 +0800

    KAFKA-10787 Update spotless version and remove support JDK8 (apache#16176)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 02c794d
Author: Kamal Chandraprakash <[email protected]>
Date:   Wed Jun 5 12:12:23 2024 +0530

    KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (apache#14778)

    KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout

    Reviewers: Luke Chen <[email protected]>

commit 7ddfa64
Author: Dongnuo Lyu <[email protected]>
Date:   Wed Jun 5 02:08:38 2024 -0400

    MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members (apache#16145)

    During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.

    Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>

commit 252c1ac
Author: Apoorv Mittal <[email protected]>
Date:   Wed Jun 5 05:55:24 2024 +0100

    KAFKA-16740: Adding skeleton code for Share Fetch and Acknowledge RPC (KIP-932) (apache#16184)

    The PR adds skeleton code for Share Fetch and Acknowledge RPCs. The changes include:

    1. Defining RPCs in KafkaApis.scala
    2. Added new SharePartitionManager class which handles the RPCs handling
    3. Added SharePartition class which manages in-memory record states and for fetched data.

    Reviewers: David Jacot <[email protected]>, Andrew Schofield <[email protected]>, Manikumar Reddy <[email protected]>

commit b89999b
Author: PoAn Yang <[email protected]>
Date:   Wed Jun 5 08:02:52 2024 +0800

    KAFKA-16483: Remove preAppendErrors from createPutCacheCallback (apache#16105)

    The method createPutCacheCallback has a input argument preAppendErrors. It is used to keep the "error" happens before appending. However, it is always empty. Also, the pre-append error is handled before createPutCacheCallback by calling responseCallback. Hence, we can remove preAppendErrors.

    Signed-off-by: PoAn Yang <[email protected]>

    Reviewers: Luke Chen <[email protected]>

commit 01e9918
Author: Kuan-Po (Cooper) Tseng <[email protected]>
Date:   Wed Jun 5 07:56:18 2024 +0800

    KAFKA-16814 KRaft broker cannot startup when `partition.metadata` is missing (apache#16165)

    When starting up kafka logManager, we'll check stray replicas to avoid some corner cases. But this check might cause broker unable to startup if partition.metadata is missing because when startup kafka, we load log from file, and the topicId of the log is coming from partition.metadata file. So, if partition.metadata is missing, the topicId will be None, and the LogManager#isStrayKraftReplica will fail with no topicID error.

    The partition.metadata missing could be some storage failure, or another possible path is unclean shutdown after topic is created in the replica, but before data is flushed into partition.metadata file. This is possible because we do the flush in async way here.

    When finding a log without topicID, we should treat it as a stray log and then delete it.

    Reviewers: Luke Chen <[email protected]>, Gaurav Narula <[email protected]>

commit d652f5c
Author: TingIāu "Ting" Kì <[email protected]>
Date:   Wed Jun 5 07:52:06 2024 +0800

    MINOR: Add topicIds and directoryIds to the return value of the toString method. (apache#16189)

    Add topicIds and directoryIds to the return value of the toString method.

    Reviewers: Luke Chen <[email protected]>

commit 7e0caad
Author: Igor Soarez <[email protected]>
Date:   Tue Jun 4 22:12:33 2024 +0100

    MINOR: Cleanup unused references in core (apache#16192)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 9821aca
Author: PoAn Yang <[email protected]>
Date:   Wed Jun 5 05:09:04 2024 +0800

    MINOR: Upgrade gradle from 8.7 to 8.8 (apache#16190)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 9ceed8f
Author: Colin P. McCabe <[email protected]>
Date:   Tue Jun 4 14:04:59 2024 -0700

    KAFKA-16535: Implement AddVoter, RemoveVoter, UpdateVoter RPCs

    Implement the add voter, remove voter, and update voter RPCs for
    KIP-853. This is just adding the RPC handling; the current
    implementation in RaftManager just throws UnsupportedVersionException.

    Reviewers: Andrew Schofield <[email protected]>, José Armando García Sancio <[email protected]>

commit 8b3c77c
Author: TingIāu "Ting" Kì <[email protected]>
Date:   Wed Jun 5 04:21:20 2024 +0800

    KAFKA-15305 The background thread should try to process the remaining task until the shutdown timer is expired. (apache#16156)

    Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai <[email protected]>

commit cda2df5
Author: Kamal Chandraprakash <[email protected]>
Date:   Wed Jun 5 00:41:30 2024 +0530

    KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra (apache#16180)

    - Removed the RemoteLogSegmentLifecycleManager
    - Removed the TopicBasedRemoteLogMetadataManagerWrapper, RemoteLogMetadataCacheWrapper, TopicBasedRemoteLogMetadataManagerHarness and TopicBasedRemoteLogMetadataManagerWrapperWithHarness

    Reviewers: Kuan-Po (Cooper) Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>

commit 2b47798
Author: Chris Egerton <[email protected]>
Date:   Tue Jun 4 21:04:34 2024 +0200

    MINOR: Fix return tag on Javadocs for consumer group-related Admin methods (apache#16197)

    Reviewers: Greg Harris <[email protected]>, Chia-Ping Tsai <[email protected]>
@ocadaruma ocadaruma deleted the kafka-16541 branch June 8, 2024 02:08
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…ache#15993)

A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.

To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent

Reviewers: Jun Rao <[email protected]>
satishd pushed a commit that referenced this pull request Jun 12, 2024
…5993)

A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.

To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent

Reviewers: Jun Rao <[email protected]>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…ache#15993)

A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device.

To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent

Reviewers: Jun Rao <[email protected]>
// - We still flush the change in #assign synchronously, meaning that it's guaranteed that the checkpoint file always has no missing entries.
// * Even when stale epochs are restored from the checkpoint file after the unclean shutdown, it will be handled by
// another truncateFromStart call on log loading procedure, so it won't be a problem
scheduler.scheduleOnce("leader-epoch-cache-flush-" + topicPartition, this::writeToFileForTruncation);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocadaruma Sorry for making noise on this merged PR. This async write seems to cause deadlock on test LogManagerTest#testLogRecoveryMetrics. I have filed jira (https://issues.apache.org/jira/browse/KAFKA-17142), and it will be great if you take a look at the solution described in the jira. thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for pointing out. Let me check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants