From cc46d9e126e264f28135ce6974d847ff611b5bab Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 11 Sep 2024 10:41:49 -0400 Subject: [PATCH] KAFKA-17506 KRaftMigrationDriver initialization race (#17147) There is a race condition between KRaftMigrationDriver running its first poll() and being notified by Raft about a leader change. If onControllerChange is called before RecoverMigrationStateFromZKEvent is run, we will end up getting stuck in the INACTIVE state. This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent from onControllerChange if the driver has not yet initialized. If another RecoverMigrationStateFromZKEvent was already enqueued, the second one to run will just be ignored. Reviewers: Luke Chen --- .../migration/KRaftMigrationDriver.java | 7 +++-- .../migration/KRaftMigrationDriverTest.java | 26 +++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java index 61b95c091d6dc..c00c25fd15854 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java @@ -359,6 +359,9 @@ public String name() { @Override public void onControllerChange(LeaderAndEpoch newLeaderAndEpoch) { curLeaderAndEpoch = newLeaderAndEpoch; + if (migrationState.equals(MigrationDriverState.UNINITIALIZED)) { + eventQueue.append(new RecoverMigrationStateFromZKEvent()); + } eventQueue.append(new KRaftLeaderEvent(newLeaderAndEpoch)); } @@ -519,8 +522,8 @@ public void run() throws Exception { KRaftMigrationDriver.this.image = image; String metadataType = isSnapshot ? "snapshot" : "delta"; - if (migrationState.equals(MigrationDriverState.INACTIVE)) { - // No need to log anything if this node is not the active controller + if (EnumSet.of(MigrationDriverState.UNINITIALIZED, MigrationDriverState.INACTIVE).contains(migrationState)) { + // No need to log anything if this node is not the active controller or the driver has not initialized completionHandler.accept(null); return; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java index b6e02fd6683a5..dd09a2432e65b 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java @@ -232,6 +232,32 @@ CompletableFuture enqueueMetadataChangeEventWithFuture( return future; } + @Test + public void testOnControllerChangeWhenUninitialized() throws InterruptedException { + CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); + CapturingMigrationClient.newBuilder().build(); + CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build(); + MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized"); + KRaftMigrationDriver.Builder builder = defaultTestBuilder() + .setZkMigrationClient(migrationClient) + .setPropagator(metadataPropagator) + .setFaultHandler(faultHandler); + try (KRaftMigrationDriver driver = builder.build()) { + // Fake a complete migration with ZK client + migrationClient.setMigrationRecoveryState( + ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); + + // simulate the Raft layer running before the driver has fully started. + driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1)); + + // start up the driver. this will enqueue a poll event. once run, this will enqueue a recovery event + driver.start(); + + // Even though we contrived a race above, the driver still makes it past initialization. + TestUtils.waitForCondition(() -> driver.migrationState().get(30, TimeUnit.SECONDS).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), + "Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state"); + } + } /** * Don't send RPCs to brokers for every metadata change, only when brokers or topics change. * This is a regression test for KAFKA-14668