Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16709: abortAndPauseCleaning only when future log is not existed #15951

Merged
merged 6 commits into from
May 28, 2024

Conversation

showuon
Copy link
Contributor

@showuon showuon commented May 14, 2024

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Contributor Author

showuon commented May 14, 2024

@soarez , call for review. Thanks.

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this @showuon , I've seen similar issues in production.

I'm not sure if logManager.abortAndPauseCleaning() should be called from maybeAddLogDirFetchers, since it AFAICT the fetcher will call it before starting.

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) {
// Add future replica log to partition's map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this comment refers to setting (populating) partition.futureLog, which is only written to via a call partition.createLogIfNotExists().
Since we're replacing the call to partition.createLogIfNotExists() with partition.maybeCreateFutureReplica(), this comment should be moved there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this as described in this comment: #15951 (comment)


futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
if (partition.maybeCreateFutureReplica(futureLog.parentDir, offsetCheckpoints, topicIds(partition.topic))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist. If that happens, partition.futureLog won't be set, so we need to call partition.setLog(futureLog, true)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sincre this section is inside a block of logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if the future replica exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist. If that happens, partition.futureLog won't be set, so we need to call partition.setLog(futureLog, true)?

partition.setLog(futureLog, true) is not necessary because when in partition.createLogIfNotExists(), we'll also set futureLog:

  if (isFutureReplica) {
      this.futureLog = Some(maybeCreate(this.futureLog))

Sincre this section is inside a block of logManager.getLog(topicPartition, isFuture = true).foreach { futureLog =>, doesn't that mean this only runs if the future replica exists?

Yes, normally, when we created future log is in this path:

  1. ReplicaManager#alterReplicaLogDirs
  2. partition.maybeCreateFutureReplica
  3. partition.createLogIfNotExists
  4. partition.createLog
  5. logManager.getOrCreateLog

So in the end, we'll have future log added in both logManager#futureLogs map and partition#futureLog` map.

But it's possible that the future log only exists in logManager#futureLogs map, but not in partition#futureLog map, when the future log is created, and before alter logDir completed, the broker restarted. So, after restarted, we'll add the future log into logManager map during loadLog. But the partition#futureLog map won't be updated, until we got leadership update and ReplicaManager#maybeAddLogDirFetchers is called. In this case, the partition.maybeCreateFutureReplica will not create the future log since it's existed, but the partition#futureLog will get updated with the one in logManager.

That means, we have to do the partition.createLogIfNotExists() here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means, we have to do the partition.createLogIfNotExists() here.

Given that partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() if the future replica doesn't yet exist, and that it's possible that the future log only exists in logManager#futureLogs map, but not in partition#futureLog map – do we need to call partition.createLogIfNotExists() directly instead of ``partition.maybeCreateFutureReplica()` ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to determine if partition#futureLog map exists or not, so if we call partition.createLogIfNotExists(), we can't know if it is created or not. partition.maybeCreateFutureReplica() returns a boolean value to notify us what we need.

That is, partition.maybeCreateFutureReplica() only calls partition.createLogIfNotExists() when the partition#futureLog map doesn't contain the partition. That's fine because there is no chances that futureLog exists in partition#futureLog but not in logManager#futureLogs map.

Sorry, maybe I didn't get your question here. Could you explain again if I misunderstand it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine because there is no chances that futureLog exists in partition#futureLog but not in logManager#futureLogs map.

The scenario I was thinking of is when the broker starts up, logManager loads the future log, so futureLog exists in partition#futureLog but not in logManager#futureLogs map yet. Only later the when the broker catches up with metatada (ReplicaManager#applyDelta) or receives a LeaderAndIsr request (becomeLeaderOrFollower) and this method maybeAddLogDirFetchers is called, is when we need to make sure partition#futureLog is populated too.

But you're right, my confusion here was with where maybeCreateFutureReplica checks if the futureLog already exists, it checks in itself (Partition) not in LogManager, so this makes sense.

.setPartitionEpoch(0)
.setReplicas(Seq[Integer](0).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, Uuid.randomUuid()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that LeaderAndIsrRequest carries a different topic id ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's possible, and it's been handled in ReplicaManager here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, Do you mean that topic id of future log can be different to topic id of log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the future log is just a follower of the original log, so it must have the same topic ID as the original one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was this test case will produce different topic ids. For example, the following assert will fail

      assertEquals(spyLogManager.getLog(tp0, isFuture = false).get.topicId,
        spyLogManager.getLog(tp0, isFuture = true).get.topicId)

It seems becomeLeaderOrFollower set the topic id of "log", and the mock return a different topic id of future log. IIRC (and @showuon your confirm) they should have same topic ID, and so I'm a bit confused by this test case.

Please feel free to correct me if I misunderstand anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I know what you're talking about now. The reason it won't fail is because the topicId we feed into the partition object is None. So the topicId consistency check will always pass because the original topicId is not set. I've updated the test and also verify the response has no errors in this commit: 4a1b76d . Thanks.

Comment on lines 2123 to 2125

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLogInPartition.highWatermark))
}

futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to the failing test, I found I was wrong. We should always add the partition into fetch thread no matter we created the future log or not since before maybeAddLogDirFetchers is called, the fetchers are all removed.

@showuon
Copy link
Contributor Author

showuon commented May 21, 2024

@chia7712 , I've updated the PR. Please take a look again when available. Thanks.

@soarez soarez self-requested a review May 21, 2024 11:11
@soarez
Copy link
Member

soarez commented May 21, 2024

Sorry, I clicked the wrong window, did not mean to approve, I'm still reviewing this

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this @showuon.

This PR looks good to me!

I ran all the tests that failed in CI locally and they all pass.

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
// Add future replica log to partition's map if it's not existed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If maybeCreateFutureReplica return false, we assume another thread already add the future log to partition and invoke alter thread. Hence, we don't need to abort cleaning since another thread does it.

However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, adding alter thread (replicaAlterLogDirsManager.addFetcherForPartitions(futureReplicasAndInitialOffset)) is not in this check. Is it possible that alter thread, which is invoked by another thread, just remove the future log and then this thread add the topic partition to replicaAlterLogDirsManager? It seems to me that alter thread will get fail as future log of partition is gone.

That's possible. But I think that's fine because the removal of future log could because:

  1. alter logDir completes. In this case, the new leaderAndIsr request or topic partition update will updated and this fetcher will be removed then in ReplicaManager#makeLeader or makeFollower.
  2. Another log failure happened. In this case the createLogIfInexsted will fail, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, my previous comments is incorrect. Both alterReplicaLogDirs and maybeAddLogDirFetchers are in replicaStateChangeLock, so the race condition I described should not happen.

However, I'm thinking whether it is fine to add alter thread by maybeAddLogDirFetchers even though the future log of partition is already created by another thread. Although no new alter thread will be created as BrokerIdAndFetcherId is identical.

In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?

[0]

if (partition.maybeCreateFutureReplica(destinationDir, highWatermarkCheckpoints)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 , thanks for the comment.
For this:

In short, alterReplicaLogDirs adds alter thread [0] only if it succeeds to create future log of partition. Maybe maybeAddLogDirFetchers should follow same rule? Or we can add comments to say "that is fine as replicaAlterLogDirsManager.addFetcherForPartitions will be a no-op in this case?

I chose latter option because if we only create fetcher when future log is inexisted, it might cause potential side effect that this fetcher is removed when leadership change, but not get added later. I've added the comment in this commit: 0d78e49 . Thanks.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@showuon
Copy link
Contributor Author

showuon commented May 28, 2024

Failed tests are unrelated.

@showuon showuon merged commit 91284d8 into apache:trunk May 28, 2024
1 check failed
apourchet added a commit to apourchet/kafka that referenced this pull request May 29, 2024
commit cc269b0
Author: Antoine Pourchet <[email protected]>
Date:   Wed May 29 14:15:02 2024 -0600

    KAFKA-15045: (KIP-924 pt. 14) Callback to TaskAssignor::onAssignmentComputed (apache#16123)

    This PR adds the logic and wiring necessary to make the callback to
    TaskAssignor::onAssignmentComputed with the necessary parameters.

    We also fixed some log statements in the actual assignment error
    computation, as well as modified the ApplicationState::allTasks method
    to return a Map instead of a Set of TaskInfos.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>

commit 862ea12
Author: Eugene Mitskevich <[email protected]>
Date:   Wed May 29 16:14:37 2024 -0400

    MINOR: Fix rate metric spikes (apache#15889)

    Rate reports value in the form of sumOrCount/monitoredWindowSize. It has a bug in monitoredWindowSize calculation, which leads to spikes in result values.

    Reviewers: Jun Rao <[email protected]>

commit 0f0c9ec
Author: gongxuanzhang <[email protected]>
Date:   Thu May 30 01:08:17 2024 +0800

    KAFKA-16771 First log directory printed twice when formatting storage (apache#16010)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 2d9994e
Author: Andrew Schofield <[email protected]>
Date:   Wed May 29 16:31:52 2024 +0100

    KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface (apache#15998)

    KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.

    This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.

    Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.

    Reviewers: Apoorv Mittal <[email protected]>, David Jacot <[email protected]>

commit 0b75cf7
Author: gongxuanzhang <[email protected]>
Date:   Wed May 29 22:38:00 2024 +0800

    KAFKA-16705 the flag "started" of RaftClusterInstance is false even though the cluster is started (apache#15946)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 8d11d95
Author: Loïc GREFFIER <[email protected]>
Date:   Wed May 29 14:09:22 2024 +0200

    KAFKA-16448: Add ProcessingExceptionHandler interface and implementations (apache#16090)

    This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.

    This PR brings ProcessingExceptionHandler interface and default implementations.

    Co-authored-by: Dabz <[email protected]>
    Co-authored-by: sebastienviale <[email protected]>

    Reviewer: Bruno Cadonna <[email protected]>

commit b73f479
Author: Ramin Gharib <[email protected]>
Date:   Wed May 29 13:12:54 2024 +0200

    KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide (apache#15601)

    The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input.

    Reviewers: Greg Harris <[email protected]>, Bruno Cadonna <[email protected]>

commit 897cab2
Author: Luke Chen <[email protected]>
Date:   Wed May 29 15:30:18 2024 +0800

    KAFKA-16399: Add JBOD support in tiered storage (apache#15690)

    After JBOD is supported in KRaft, we should also enable JBOD support in tiered storage. Unit tests and Integration tests are also added.

    Reviewers: Satish Duggana <[email protected]>, Kamal Chandraprakash <[email protected]>, Igor Soarez <[email protected]>, Mickael Maison <[email protected]>

commit eefd114
Author: Dongnuo Lyu <[email protected]>
Date:   Wed May 29 02:21:30 2024 -0400

    KAFKA-16832; LeaveGroup API for upgrading ConsumerGroup (apache#16057)

    This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.

    Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>

commit 9562143
Author: A. Sophie Blee-Goldman <[email protected]>
Date:   Tue May 28 21:35:02 2024 -0700

    HOTFIX: remove unnecessary list creation (apache#16117)

    Removing a redundant list declaration in the new StickyTaskAssignor implementation

    Reviewers: Antoine Pourchet <[email protected]>

commit d64e3fb
Author: Antoine Pourchet <[email protected]>
Date:   Tue May 28 20:43:30 2024 -0600

    KAFKA-15045: (KIP-924 pt. 13) AssignmentError calculation added (apache#16114)

    This PR adds the post-processing of the TaskAssignment to figure out if the new assignment is valid, and return an AssignmentError otherwise.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>

commit 8d243df
Author: Antoine Pourchet <[email protected]>
Date:   Tue May 28 19:01:18 2024 -0600

    KAFKA-15045: (KIP-924 pt. 12) Wiring in new assignment configs and logic (apache#16074)

    This PR creates the new public config of KIP-924 in StreamsConfig and uses it to instantiate user-created TaskAssignors. If such a TaskAssignor is found and successfully created we then use that assignor to perform the task assignment, otherwise we revert back to the pre KIP-924 world with the internal task assignors.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Almog Gavra <[email protected]>

commit 56ee139
Author: Antoine Pourchet <[email protected]>
Date:   Tue May 28 18:05:51 2024 -0600

    KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor (apache#16052)

    This PR implements the StickyTaskAssignor with the new KIP 924 API.

    Reviewers: Anna Sophie Blee-Goldman <[email protected]>

commit 59ba555
Author: Nick Telford <[email protected]>
Date:   Wed May 29 00:23:23 2024 +0100

    KAFKA-15541: Add oldest-iterator-open-since-ms metric (apache#16041)

    Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

    This new `StateStore` metric tracks the timestamp that the oldest
    surviving Iterator was created.

    This timestamp should continue to climb, and closely track the current
    time, as old iterators are closed and new ones created. If the timestamp
    remains very low (i.e. old), that suggests an Iterator has leaked, which
    should enable users to isolate the affected store.

    It will report no data when there are no currently open Iterators.

    Reviewers: Matthias J. Sax <[email protected]>

commit 4eb60b5
Author: Frederik Rouleau <[email protected]>
Date:   Tue May 28 23:56:47 2024 +0200

    KAFKA-16507 Add KeyDeserializationException and ValueDeserializationException with record content (apache#15691)

    Implements KIP-1036.

    Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier.

    Reviewers: Kirk True <[email protected]>, Andrew Schofield <[email protected]>, Matthias J. Sax <[email protected]>

commit 4d04eb8
Author: PoAn Yang <[email protected]>
Date:   Wed May 29 03:13:33 2024 +0800

    KAFKA-16796 Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder (apache#16064)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit a649bc4
Author: Luke Chen <[email protected]>
Date:   Wed May 29 00:05:49 2024 +0800

    KAFKA-16711: Make sure to update highestOffsetInRemoteStorage after log dir change (apache#15947)

    Reviewers: Kamal Chandraprakash<[email protected]>, Satish Duggana <[email protected]>

commit 64f699a
Author: Omnia Ibrahim <[email protected]>
Date:   Tue May 28 15:22:54 2024 +0100

    KAFKA-15853: Move general configs out of KafkaConfig (apache#16040)

    Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>

commit 699438b
Author: Sanskar Jhajharia <[email protected]>
Date:   Tue May 28 16:34:44 2024 +0530

    MINOR: Fix the config name in ProducerFailureHandlingTest (apache#16099)

    When moving from KafkaConfig.ReplicaFetchMaxBytesProp we used ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG instead of ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG. This PR patches the same.

    Reviewers: Omnia Ibrahim <[email protected]>, Manikumar Reddy <[email protected]>

commit a57c05b
Author: Ken Huang <[email protected]>
Date:   Tue May 28 17:42:33 2024 +0900

    KAFKA-16805 Stop using a ClosureBackedAction to configure Spotbugs reports (apache#16081)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 91284d8
Author: Luke Chen <[email protected]>
Date:   Tue May 28 12:23:34 2024 +0800

    KAFKA-16709: abortAndPauseCleaning only when future log is not existed (apache#15951)

    When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

    But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

    1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
    2. tp0 leadership changed (LogCleaningPaused count = 2)
    3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
    4. LogCleaning keeps paused because the count is always >  0

    This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

    Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>

commit adab48d
Author: Greg Harris <[email protected]>
Date:   Mon May 27 18:33:01 2024 -0700

    MINOR: Disable JDK 11 and 17 tests on PRs (apache#16051)

    Signed-off-by: Greg Harris <[email protected]>
    Reviewers: Justine Olshan <[email protected]>, David Arthur <[email protected]>, Ismael Juma <[email protected]>, Luke Chen <[email protected]>, Chia-Ping Tsai <[email protected]>

commit bac8df5
Author: Colin P. McCabe <[email protected]>
Date:   Mon May 27 08:53:53 2024 -0700

    MINOR: fix typo in KAFKA-16515

commit da3304e
Author: David Jacot <[email protected]>
Date:   Mon May 27 17:10:37 2024 +0200

    KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE (apache#16072)

    This patch was initially created in apache#15536.

    When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to UNSTABLE_OFFSET_COMMIT errors when trying to fetch the offsets with read_committed. This change makes it so the invalid commits are not added to the pendingOffsetCommits set.

    Co-authored-by: Kyle Phelps <[email protected]>

    Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan <[email protected]>

commit 524ad1e
Author: Kamal Chandraprakash <[email protected]>
Date:   Mon May 27 15:14:23 2024 +0530

    KAFKA-16452: Don't throw OOORE when converting the offset to metadata (apache#15825)

    Don't throw OFFSET_OUT_OF_RANGE error when converting the offset to metadata, and next time the leader should increment the high watermark by itself after receiving fetch requests from followers. This can happen when checkpoint files are missing and being elected as a leader.

    Reviewers: Luke Chen <[email protected]>, Jun Rao <[email protected]>

commit d9ee9c9
Author: Nick Telford <[email protected]>
Date:   Sat May 25 20:22:56 2024 +0100

    KAFKA-15541: Use LongAdder instead of AtomicInteger (apache#16076)

    `LongAdder` performs better than `AtomicInteger` when under contention
    from many threads. Since it's possible that many Interactive Query
    threads could create a large number of `KeyValueIterator`s, we don't
    want contention on a metric to be a performance bottleneck.

    The trade-off is memory, as `LongAdder` uses more memory to space out
    independent counters across different cache lines. In practice, I don't
    expect this to cause too many problems, as we're only constructing 1
    per-store.

    Reviewers: Matthias J. Sax <[email protected]>

commit a8d166c
Author: Ritika Reddy <[email protected]>
Date:   Sat May 25 09:06:15 2024 -0700

    KAFKA-16625; Reverse lookup map from topic partitions to members (apache#15974)

    This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

    Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>

commit d585a49
Author: Jeff Kim <[email protected]>
Date:   Fri May 24 16:33:57 2024 -0400

    KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit (apache#16059)

    CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit.

    Reviewers: David Jacot <[email protected]>

commit 8eea6b8
Author: Edoardo Comar <[email protected]>
Date:   Fri May 24 20:33:00 2024 +0100

    MINOR: mention KAFKA-15905 in docs "Notable changes in 3.7.1" (apache#16070)

    * MINOR: mention KAFKA-15905 in docs "Notable changes in 3.7.1/3.8.0"

    Co-Authored-By: Adrian Preston <[email protected]>

commit 4f55786
Author: Colin P. McCabe <[email protected]>
Date:   Mon May 20 15:41:52 2024 -0700

    KAFKA-16515: Fix the ZK Metadata cache confusion between brokers and controllers

    ZkMetadataCache could theoretically return KRaft controller information from a call to
    ZkMetadataCache.getAliveBrokerNode, which doesn't make sense. KRaft controllers are not part of the
    set of brokers. The only use-case for this functionality was in MetadataCacheControllerNodeProvider
    during ZK migration, where it allowed ZK brokers in migration mode to forward requests to
    kcontrollers when appropriate. This PR changes MetadataCacheControllerNodeProvider to simply
    delegate to quorumControllerNodeProvider in this case.

    Reviewers: José Armando García Sancio <[email protected]>

commit 90892ae
Author: Colin P. McCabe <[email protected]>
Date:   Mon May 20 16:23:27 2024 -0700

    KAFKA-16516: Fix the controller node provider for broker to control channel

    Fix the code in the RaftControllerNodeProvider to query RaftManager to find Node information,
    rather than consulting a static map. Add a RaftManager.voterNode function to supply this
    information. In KRaftClusterTest, add testControllerFailover to get more coverage of controller
    failovers.

    Reviewers: José Armando García Sancio <[email protected]>

commit 2432a18
Author: KrishVora01 <[email protected]>
Date:   Fri May 24 22:21:02 2024 +0530

    KAFKA-16373: KIP-1028:  Adding code to support Apache Kafka Docker Official Images (apache#16027)

    This PR aims to add JVM based Docker Official Image for Apache Kafka as per the following KIP - https://cwiki.apache.org/confluence/display/KAFKA/KIP-1028%3A+Docker+Official+Image+for+Apache+Kafka

    This PR adds the following functionalities:
    Introduces support for Apache Kafka Docker Official Images via:

    GitHub Workflows:

    - Workflow to prepare static source files for Docker images
    - Workflow to build and test Docker official images
    - Scripts to prepare source files and perform Docker image builds and tests

    A new directory for Docker official images, named docker/docker_official_images. This is the new directory to house all Docker Official Image assets.

    Co-authored-by: Vedarth Sharma <[email protected]>

    Reviewers: Manikumar Reddy <[email protected]>, Vedarth Sharma <[email protected]>

commit 0143c72
Author: Lianet Magrans <[email protected]>
Date:   Fri May 24 14:19:43 2024 +0200

    KAFKA-16815: Handle FencedInstanceId in HB response (apache#16047)

    Handle FencedInstanceIdException that a consumer may receive in the heartbeat response. This will be the case when a static consumer is removed from the group by and admin client, and another member joins with the same group.instance.id (allowed in). The first member will receive a FencedInstanceId on its next heartbeat. The expectation is that this should be handled as a fatal error.

    There are no actual changes in logic with this PR, given that without being handled, the FencedInstanceId was being treated as an "unexpected error", which are all treated as fatal errors, so the outcome remains the same. But we're introducing this small change just for accuracy in the logic and the logs: FencedInstanceId is expected during heartbeat, a log line is shown describing the situation and why it happened (and it's treated as a fatal error, just like it was before this PR).

    This PR also improves the test to ensure that the error propagated to the app thread matches the one received in the HB.

    Reviewers: Andrew Schofield <[email protected]>, David Jacot <[email protected]>

commit c5cd190
Author: Gantigmaa Selenge <[email protected]>
Date:   Fri May 24 11:50:47 2024 +0100

    MINOR: Refactor SSL/SASL admin integration tests to not use a custom authorizer (apache#15377)

    Reviewers: Mickael Maison <[email protected]>

commit 520aa86
Author: Jeff Kim <[email protected]>
Date:   Fri May 24 03:51:50 2024 -0400

    KAFKA-16626; Lazily convert subscribed topic names to topic ids (apache#15970)

    This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.

    Reviewers: David Jacot <[email protected]>

commit 6941598
Author: Krishna Agarwal <[email protected]>
Date:   Fri May 24 12:16:01 2024 +0530

    KAFKA-16826: Integrate Native Docker Image with github actions (apache#16045)

    This PR integrates the Native docker image with the existing github action jobs for the jvm docker image of AK.

    The integration is done to the following actions:

    docker_build_and_test.yml: Builds the docker image and runs sanity tests and CVE scan
    docker_rc_release.yml: Builds the RC docker image for both amd and arm platform and pushes it to the dockerhub.
    docker_promote.yml: Promotes the RC docker image to the released image tag

    Reviewers: Manikumar Reddy <[email protected]>, Vedarth Sharma <[email protected]>

commit de32028
Author: Kuan-Po (Cooper) Tseng <[email protected]>
Date:   Fri May 24 05:25:53 2024 +0800

    KAFKA-16828 RackAwareTaskAssignorTest failed (apache#16044)

    Reviewers: Chia-Ping Tsai <[email protected]>

commit 11ad5e8
Author: Greg Harris <[email protected]>
Date:   Thu May 23 13:23:18 2024 -0700

    MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions (apache#15469)

    Signed-off-by: Greg Harris <[email protected]>
    Reviewers: Mickael Maison <[email protected]>
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Jun 1, 2024
apache#15951)

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>
wernerdv pushed a commit to wernerdv/kafka that referenced this pull request Jun 3, 2024
apache#15951)

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
apache#15951)

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
apache#15951)

When doing alter replica logDirs, we'll create a future log and pause log cleaning for the partition( here). And this log cleaning pausing will resume after alter replica logDirs completes (here). And when in the resuming log cleaning, we'll decrement 1 for the LogCleaningPaused count. Once the count reached 0, the cleaning pause is really resuming. (here). For more explanation about the logCleaningPaused state can check here.

But, there's still one factor that could increase the LogCleaningPaused count: leadership change (here). When there's a leadership change, we'll check if there's a future log in this partition, if so, we'll create future log and pauseCleaning (LogCleaningPaused count + 1). So, if during the alter replica logDirs:

1. alter replica logDirs for tp0 triggered (LogCleaningPaused count = 1)
2. tp0 leadership changed (LogCleaningPaused count = 2)
3. alter replica logDirs completes, resuming logCleaning (LogCleaningPaused count = 1)
4. LogCleaning keeps paused because the count is always >  0

This PR fixes this issue by only abortAndPauseCleaning when future log is not existed. We did the same check in alterReplicaLogDirs. So this change can make sure there's only 1 abortAndPauseCleaning for either abortAndPauseCleaning or maybeAddLogDirFetchers. Tests also added.

Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez <[email protected]>
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon : Thanks for the PR and sorry for the late review. Just a few minor comments.

@@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition,
* @param highWatermarkCheckpoints Checkpoint to load initial high watermark from
* @return true iff the future replica is created
*/
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the new param to the javadoc?

if (futureReplicasAndInitialOffset.nonEmpty)
if (futureReplicasAndInitialOffset.nonEmpty) {
// Even though it's possible that there is another thread adding fetcher for this future log partition,
// but it's fine because `BrokerIdAndFetcherId` will be identical and the operation will be no-op.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the leader changes, we need to propagate the new leader epoch to ReplicaAlterLogDirsThread (see #8223). So, the operation is not a no-op?


futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic), leader,
partition.getLeaderEpoch, futureLog.highWatermark))
}
}
}

if (futureReplicasAndInitialOffset.nonEmpty)
if (futureReplicasAndInitialOffset.nonEmpty) {
// Even though it's possible that there is another thread adding fetcher for this future log partition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, becomeLeaderOrFollower(), alterReplicaLogDirs(), and applyDelta() are done under the replicaStateChangeLock. Is it really possible for another thread to add fetcher for the future log?

// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
// replica from source dir to destination dir
logManager.abortAndPauseCleaning(topicPartition)
// Add future replica log to partition's map if it's not existed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not existed => if it doesn't exist

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants