Skip to content

Commit

Permalink
KAFKA-17506 KRaftMigrationDriver initialization race (apache#17147)
Browse files Browse the repository at this point in the history
There is a race condition between KRaftMigrationDriver running its first poll() and being notified by Raft about a leader change. If onControllerChange is called before RecoverMigrationStateFromZKEvent is run, we will end up getting stuck in the INACTIVE state.

This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent from onControllerChange if the driver has not yet initialized. If another RecoverMigrationStateFromZKEvent was already enqueued, the second one to run will just be ignored.

Reviewers: Luke Chen <[email protected]>
  • Loading branch information
mumrah authored and tedyu committed Jan 6, 2025
1 parent 744ef48 commit cc46d9e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ public String name() {
@Override
public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) {
curLeaderAndEpoch = newLeaderAndEpoch;
if (migrationState.equals(MigrationDriverState.UNINITIALIZED)) {
eventQueue.append(new RecoverMigrationStateFromZKEvent());
}
eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch));
}

Expand Down Expand Up @@ -519,8 +522,8 @@ public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";

if (migrationState.equals(MigrationDriverState.INACTIVE)) {
// No need to log anything if this node is not the active controller
if (EnumSet.of(MigrationDriverState.UNINITIALIZED, MigrationDriverState.INACTIVE).contains(migrationState)) {
// No need to log anything if this node is not the active controller or the driver has not initialized
completionHandler.accept(null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,32 @@ CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(
return future;
}

@Test
public void testOnControllerChangeWhenUninitialized() throws InterruptedException {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient.newBuilder().build();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
// Fake a complete migration with ZK client
migrationClient.setMigrationRecoveryState(
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));

// simulate the Raft layer running before the driver has fully started.
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));

// start up the driver. this will enqueue a poll event. once run, this will enqueue a recovery event
driver.start();

// Even though we contrived a race above, the driver still makes it past initialization.
TestUtils.waitForCondition(() -> driver.migrationState().get(30, TimeUnit.SECONDS).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
}
}
/**
* Don't send RPCs to brokers for every metadata change, only when brokers or topics change.
* This is a regression test for KAFKA-14668
Expand Down

0 comments on commit cc46d9e

Please sign in to comment.