Skip to content

Commit

Permalink
KAFKA-15441 Allow broker heartbeats to complete in metadata transacti…
Browse files Browse the repository at this point in the history
…on (apache#14351)

This patch allows broker heartbeat events to be completed while a metadata transaction is in-flight.

More generally, this patch allows any RUNS_IN_PREMIGRATION event to complete while the controller
is in pre-migration mode even if the migration transaction is in-flight.

We had a problem with broker heartbeats timing out because they could not be completed while a large
ZK migration transaction was in-flight. This resulted in the controller fencing all the ZK brokers which 
has many undesirable downstream effects. 

Reviewers: Akhilesh Chaganti <[email protected]>, Colin Patrick McCabe <[email protected]>
  • Loading branch information
mumrah authored Sep 8, 2023
1 parent 84c49c6 commit b24ccd6
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package kafka.zk

import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.ClusterInstance
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, Type}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
Expand Down Expand Up @@ -55,6 +55,26 @@ import java.util.{Properties, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._

object ZkMigrationIntegrationTest {
def addZkBrokerProps(props: Properties): Unit = {
props.setProperty("inter.broker.listener.name", "EXTERNAL")
props.setProperty("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
props.setProperty("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
props.setProperty("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
}

def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = {
Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2).foreach { mv =>
val clusterConfig = ClusterConfig.defaultClusterBuilder()
.metadataVersion(mv)
.brokers(3)
.`type`(Type.ZK)
.build()
addZkBrokerProps(clusterConfig.serverProperties())
clusterGenerator.accept(clusterConfig)
}
}
}

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300)
Expand Down Expand Up @@ -308,12 +328,7 @@ class ZkMigrationIntegrationTest {
}

// SCRAM and Quota are intermixed. Test Quota Only here
@ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
))
@ClusterTemplate("zkClustersForAllMigrationVersions")
def testDualWrite(zkCluster: ClusterInstance): Unit = {
// Create a topic in ZK mode
var admin = zkCluster.createAdminClient()
Expand All @@ -334,7 +349,7 @@ class ZkMigrationIntegrationTest {
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.COMPLETES_IN_TRANSACTION;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION;


Expand Down Expand Up @@ -684,21 +683,16 @@ enum ControllerOperationFlag {
* Operations without this flag will always return NOT_CONTROLLER when invoked in premigration
* mode.
* <p>
* In pre-migration mode, we are still waiting to load the metadata from Apache
* ZooKeeper into the metadata log. Therefore, the metadata log is mostly empty,
* even though the cluster really does have metadata. Very few operations should
* use this flag.
*/
RUNS_IN_PREMIGRATION,

/**
* This flag signifies that an event will be completed even if it is part of an unfinished transaction.
* This is needed for metadata transactions so that external callers can add records to a transaction
* and still use the returned future. One example usage of this flag is the batches of migrations records.
* The migration driver needs to wait on each submitted batch to avoid overwhelming the controller queue
* with events, so it needs events to be completed based on the committed (i.e., not stable) offset.
* In pre-migration mode, we are still waiting to load the metadata from Apache ZooKeeper into
* the metadata log. Therefore, the metadata log is mostly empty, even though the cluster really
* does have metadata
* <p>
* Events using this flag will be completed even if a transaction is ongoing. Pre-migration
* events will be completed using the unstable (committed) offset rather than the stable offset.
* <p>
* In practice, very few operations should use this flag.
*/
COMPLETES_IN_TRANSACTION
RUNS_IN_PREMIGRATION
}

interface ControllerWriteOperation<T> {
Expand Down Expand Up @@ -779,7 +773,14 @@ public void run() throws Exception {
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
OptionalLong maybeOffset = deferredEventQueue.highestPendingOffset();
// If the operation can complete within a transaction, let it use the
// unstable purgatory so that it can complete sooner.
OptionalLong maybeOffset;
if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
maybeOffset = deferredUnstableEventQueue.highestPendingOffset();
} else {
maybeOffset = deferredEventQueue.highestPendingOffset();
}
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can complete immediately.
Expand Down Expand Up @@ -841,7 +842,7 @@ public Long apply(List<ApiMessageAndVersion> records) {

// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
if (flags.contains(COMPLETES_IN_TRANSACTION)) {
if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
deferredUnstableEventQueue.add(resultAndOffset.offset(), this);
} else {
deferredEventQueue.add(resultAndOffset.offset(), this);
Expand Down Expand Up @@ -961,9 +962,7 @@ <T> CompletableFuture<T> appendWriteEvent(
}

class MigrationRecordConsumer implements ZkRecordConsumer {
private final EnumSet<ControllerOperationFlag> eventFlags = EnumSet.of(
RUNS_IN_PREMIGRATION, COMPLETES_IN_TRANSACTION
);
private final EnumSet<ControllerOperationFlag> eventFlags = EnumSet.of(RUNS_IN_PREMIGRATION);

private volatile OffsetAndEpoch highestMigrationRecordOffset;

Expand Down Expand Up @@ -1313,7 +1312,7 @@ private void rescheduleMaybeFenceStaleBrokers() {
rescheduleMaybeFenceStaleBrokers();
return result;
},
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION));
EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
}

private void cancelMaybeFenceReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public void run() throws Exception {
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
} catch (Throwable t) {
MigrationManifest partialManifest = manifestBuilder.build();
log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest, t);
zkRecordConsumer.abortMigration(); // This terminates the controller via fatal fault handler
super.handleException(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1491,6 +1492,7 @@ public void testActivationRecordsPartialTransactionNoSupport() {
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0)
));

@Test
public void testFailoverDuringMigrationTransaction() throws Exception {
try (
Expand Down Expand Up @@ -1534,4 +1536,62 @@ public void testFailoverDuringMigrationTransaction() throws Exception {

}
}

@ParameterizedTest
@EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
try (
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
) {
QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
setControllerBuilderInitializer(controllerBuilder ->
controllerBuilder
.setZkMigrationEnabled(true)
.setMaxIdleIntervalNs(OptionalLong.of(TimeUnit.MILLISECONDS.toNanos(100)))
).
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test"));
QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
QuorumController active = controlEnv.activeController(true);

// Register a ZK broker
BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
setBrokerId(0).
setRack(null).
setClusterId(active.clusterId()).
setIsMigratingZkBroker(true).
setFeatures(brokerFeatures(metadataVersion, metadataVersion)).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
setPort(9092)).iterator()))).get();

// Start migration
ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);

// Interleave migration batches with heartbeats. Ensure the heartbeat events use the correct
// offset when adding to the purgatory. Otherwise, we get errors like:
// There is already a deferred event with offset 292. We should not add one with an offset of 241 which is lower than that.
for (int i = 0; i < 100; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "testBrokerHeartbeatDuringMigration" + i;
Future<?> migrationFuture = migrationConsumer.acceptBatch(
Arrays.asList(
new ApiMessageAndVersion(new TopicRecord().setTopicId(topicId).setName(topicName), (short) 0),
new ApiMessageAndVersion(new PartitionRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(0, 1, 2)), (short) 0)));
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
setCurrentMetadataOffset(100000L + i));
migrationFuture.get();
}

// Ensure that we can complete a heartbeat even though we leave migration transaction hanging
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
setCurrentMetadataOffset(100100L)).get());

}
}
}

0 comments on commit b24ccd6

Please sign in to comment.