Skip to content

Commit

Permalink
KAFKA-16518: Implement KIP-853 flags for storage-tool.sh
Browse files Browse the repository at this point in the history
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters.
This PR implements these two flags in storage-tool.sh.

There are currently two valid ways to format a cluster:

- The pre-KIP-853 way, where you use a statically configured controller quorum. In this case,
  neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.

- The KIP-853 way, where one of --standalone and --initial-voters must be specified with the
  initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.

This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file
was never intended to get so huge, or to implement complex logic like generating metadata records.
Those things should be done by code in the metadata or raft gradle modules. This is also useful for
junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in
StorageTool.scala, for now.)
  • Loading branch information
muralibasani authored and cmccabe committed Jul 25, 2024
1 parent 7efb58f commit d5ea780
Show file tree
Hide file tree
Showing 14 changed files with 1,434 additions and 934 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="(CyclomaticComplexity|NPathComplexity)"
files="(PartitionRegistration|PartitionChangeBuilder).java"/>
files="(PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
Expand Down
394 changes: 45 additions & 349 deletions core/src/main/scala/kafka/tools/StorageTool.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ package kafka.api

import java.util.Properties
import kafka.utils._
import kafka.tools.StorageTool
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, ScramCredentialInfo, UserScramCredentialAlteration, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.metadata.storage.Formatter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, TestInfo}

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.DelegationTokenManagerConfigs

class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
Expand Down Expand Up @@ -69,17 +67,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
}

// Create the admin credentials for KRaft as part of controller initialization
override def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = {
val args = Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", "-S",
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
val namespace = StorageTool.parseArguments(args.toArray)
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
StorageTool.getUserScramCredentialRecords(namespace).foreach {
userScramCredentialRecords => for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
}
Some(metadataRecords)

override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
formatter.setScramArguments(
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
}

override def createPrivilegedAdminClient(): Admin = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package kafka.api

import java.util.Properties

import kafka.utils._
import kafka.tools.StorageTool
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.test.TestSslUtils

import scala.jdk.CollectionConverters._
Expand All @@ -31,9 +30,6 @@ import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.server.common.ApiMessageAndVersion

class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
Expand All @@ -55,17 +51,10 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
}

// Create the admin credentials for KRaft as part of controller initialization
override def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = {
val args = Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", "-S",
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
val namespace = StorageTool.parseArguments(args.toArray)
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
StorageTool.getUserScramCredentialRecords(namespace).foreach {
userScramCredentialRecords => for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
}
Some(metadataRecords)
override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
formatter.setScramArguments(List(
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
}

override def configureListeners(props: collection.Seq[Properties]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.zookeeper.client.ZKClientConfig
Expand All @@ -46,9 +45,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}

import java.nio.file.{Files, Paths}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -104,7 +101,7 @@ class KRaftQuorumImplementation(
): KafkaBroker = {
val metaPropertiesEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
.addLogDirs(config.logDirs.asJava)
loader.addLogDirs(config.logDirs.asJava)
loader.addMetadataLogDir(config.metadataLogDir)
val ensemble = loader.load()
val copier = new MetaPropertiesEnsemble.Copier(ensemble)
Expand Down Expand Up @@ -186,8 +183,6 @@ abstract class QuorumTestHarness extends Logging {
private var testInfo: TestInfo = _
protected var implementation: QuorumImplementation = _

val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()

def isKRaftTest(): Boolean = {
TestInfoUtils.isKRaft(testInfo)
}
Expand Down Expand Up @@ -313,7 +308,7 @@ abstract class QuorumTestHarness extends Logging {
CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(), kRaftQuorumImplementation.log)
}

def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
def addFormatterSettings(formatter: Formatter): Unit = {}

private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
newKRaftQuorum(new Properties())
Expand All @@ -333,24 +328,16 @@ abstract class QuorumTestHarness extends Logging {
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1000")
}
val nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG))
val metadataDir = TestUtils.tempDir()
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
val formatter = new Formatter().
setClusterId(Uuid.randomUuid().toString).
setNodeId(nodeId).
build()
TestUtils.formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), metaProperties, metadataVersion, optionalMetadataRecords)

val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))

optionalMetadataRecords.foreach { metadataArguments =>
for (record <- metadataArguments) metadataRecords.add(record)
}

val bootstrapMetadata = BootstrapMetadata.fromRecords(metadataRecords, "test harness")
setNodeId(nodeId)
val metadataDir = TestUtils.tempDir()
formatter.addDirectory(metadataDir.getAbsolutePath)
//if (isNewGroupCoordinatorEnabled()) {
// formatter.setFeatureLevel(Features.GROUP_VERSION.featureName, Features.GROUP_VERSION.latestTesting)
//}
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()

props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
Expand All @@ -365,7 +352,7 @@ abstract class QuorumTestHarness extends Logging {
val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader().
addMetadataLogDir(metadataDir.getAbsolutePath).
load()
metaPropertiesEnsemble.verify(Optional.of(metaProperties.clusterId().get()),
metaPropertiesEnsemble.verify(Optional.of(formatter.clusterId()),
OptionalInt.of(nodeId),
util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR))
val sharedServer = new SharedServer(
Expand Down Expand Up @@ -406,7 +393,7 @@ abstract class QuorumTestHarness extends Logging {
faultHandlerFactory,
metadataDir,
controllerQuorumVotersFuture,
metaProperties.clusterId.get(),
formatter.clusterId(),
this,
faultHandler
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, T
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesVersion}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.Mockito

import scala.collection.{immutable, mutable}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Random

Expand Down Expand Up @@ -159,12 +159,11 @@ class ReplicaManagerConcurrencyTest extends Logging {
metadataCache: MetadataCache,
): ReplicaManager = {
val logDir = TestUtils.tempDir()
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
val formatter = new Formatter().
setClusterId(Uuid.randomUuid().toString).
setNodeId(1).
build()
TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latestTesting(), None)
setNodeId(1)
formatter.addDirectory(logDir.getAbsolutePath)
formatter.run()

val props = new Properties
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "100@localhost:12345")
Expand Down
Loading

0 comments on commit d5ea780

Please sign in to comment.