Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16518: Implement KIP-853 flags for storage-tool.sh #16669

Merged
merged 7 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@
<subpackage name="migration">
<allow pkg="org.apache.kafka.controller" />
</subpackage>
<subpackage name="storage">
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.snapshot" />
</subpackage>
<subpackage name="util">
<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
</subpackage>
Expand Down
4 changes: 2 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@

<!-- Raft -->
<suppress checks="NPathComplexity"
files="RecordsIterator.java"/>
files="(DynamicVoter|RecordsIterator).java"/>

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
Expand Down Expand Up @@ -325,7 +325,7 @@
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
files="(QuorumController).java"/>
<suppress checks="(CyclomaticComplexity|NPathComplexity)"
files="(PartitionRegistration|PartitionChangeBuilder).java"/>
files="(PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
<suppress checks="NPathComplexity"
Expand Down
418 changes: 71 additions & 347 deletions core/src/main/scala/kafka/tools/StorageTool.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@ package kafka.api

import java.util.Properties
import kafka.utils._
import kafka.tools.StorageTool
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, ScramCredentialInfo, UserScramCredentialAlteration, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.metadata.storage.Formatter
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.api.{BeforeEach, TestInfo}

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.DelegationTokenManagerConfigs

class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
Expand Down Expand Up @@ -69,17 +67,11 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
}

// Create the admin credentials for KRaft as part of controller initialization
override def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = {
val args = Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", "-S",
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
val namespace = StorageTool.parseArguments(args.toArray)
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
StorageTool.getUserScramCredentialRecords(namespace).foreach {
userScramCredentialRecords => for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
}
Some(metadataRecords)

override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
formatter.setScramArguments(
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
}

override def createPrivilegedAdminClient(): Admin = createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, kafkaPassword)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
package kafka.api

import java.util.Properties

import kafka.utils._
import kafka.tools.StorageTool
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.test.TestSslUtils

import scala.jdk.CollectionConverters._
Expand All @@ -31,9 +30,6 @@ import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.server.common.ApiMessageAndVersion

class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
Expand All @@ -55,17 +51,10 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
}

// Create the admin credentials for KRaft as part of controller initialization
override def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = {
val args = Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", "-S",
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]")
val namespace = StorageTool.parseArguments(args.toArray)
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
StorageTool.getUserScramCredentialRecords(namespace).foreach {
userScramCredentialRecords => for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
}
}
Some(metadataRecords)
override def addFormatterSettings(formatter: Formatter): Unit = {
formatter.setClusterId("XcZZOzUqS4yHOjhMQB6JLQ")
formatter.setScramArguments(List(
s"SCRAM-SHA-256=[name=${JaasTestUtils.KafkaScramAdmin},password=${JaasTestUtils.KafkaScramAdminPassword}]").asJava)
}

override def configureListeners(props: collection.Seq[Properties]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.zookeeper.client.ZKClientConfig
Expand All @@ -46,9 +45,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}

import java.nio.file.{Files, Paths}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, immutable}
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -104,7 +101,7 @@ class KRaftQuorumImplementation(
): KafkaBroker = {
val metaPropertiesEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
.addLogDirs(config.logDirs.asJava)
loader.addLogDirs(config.logDirs.asJava)
loader.addMetadataLogDir(config.metadataLogDir)
val ensemble = loader.load()
val copier = new MetaPropertiesEnsemble.Copier(ensemble)
Expand Down Expand Up @@ -186,8 +183,6 @@ abstract class QuorumTestHarness extends Logging {
private var testInfo: TestInfo = _
protected var implementation: QuorumImplementation = _

val bootstrapRecords: ListBuffer[ApiMessageAndVersion] = ListBuffer()

def isKRaftTest(): Boolean = {
TestInfoUtils.isKRaft(testInfo)
}
Expand Down Expand Up @@ -313,7 +308,7 @@ abstract class QuorumTestHarness extends Logging {
CoreUtils.swallow(kRaftQuorumImplementation.controllerServer.shutdown(), kRaftQuorumImplementation.log)
}

def optionalMetadataRecords: Option[ArrayBuffer[ApiMessageAndVersion]] = None
def addFormatterSettings(formatter: Formatter): Unit = {}

private def newKRaftQuorum(testInfo: TestInfo): KRaftQuorumImplementation = {
newKRaftQuorum(new Properties())
Expand All @@ -334,31 +329,6 @@ abstract class QuorumTestHarness extends Logging {
}
val nodeId = Integer.parseInt(props.getProperty(KRaftConfigs.NODE_ID_CONFIG))
val metadataDir = TestUtils.tempDir()
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
setClusterId(Uuid.randomUuid().toString).
setNodeId(nodeId).
build()
TestUtils.formatDirectories(immutable.Seq(metadataDir.getAbsolutePath), metaProperties, metadataVersion, optionalMetadataRecords)

val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))

metadataRecords.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName(Features.TRANSACTION_VERSION.featureName)
.setFeatureLevel(Features.TRANSACTION_VERSION.latestTesting),
0.toShort
))

optionalMetadataRecords.foreach { metadataArguments =>
for (record <- metadataArguments) metadataRecords.add(record)
}

val bootstrapMetadata = BootstrapMetadata.fromRecords(metadataRecords, "test harness")

props.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, metadataDir.getAbsolutePath)
val proto = controllerListenerSecurityProtocol.toString
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"CONTROLLER:$proto")
Expand All @@ -368,11 +338,26 @@ abstract class QuorumTestHarness extends Logging {
// Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent.
props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000")
val config = new KafkaConfig(props)

val formatter = new Formatter().
setClusterId(Uuid.randomUuid().toString).
setNodeId(nodeId)
formatter.addDirectory(metadataDir.getAbsolutePath)
formatter.setReleaseVersion(metadataVersion)
formatter.setUnstableFeatureVersionsEnabled(true)
formatter.setControllerListenerName(config.controllerListenerNames.head)
formatter.setMetadataLogDirectory(config.metadataLogDir)
addFormatterSettings(formatter)
//formatter.setFeatureLevel(Features.TRANSACTION_VERSION.featureName,
// Features.TRANSACTION_VERSION.defaultValue(metadataVersion))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncommented code? Let's remove this if it is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()

val controllerQuorumVotersFuture = new CompletableFuture[util.Map[Integer, InetSocketAddress]]
val metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader().
addMetadataLogDir(metadataDir.getAbsolutePath).
load()
metaPropertiesEnsemble.verify(Optional.of(metaProperties.clusterId().get()),
metaPropertiesEnsemble.verify(Optional.of(formatter.clusterId()),
OptionalInt.of(nodeId),
util.EnumSet.of(REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR))
val sharedServer = new SharedServer(
Expand Down Expand Up @@ -413,7 +398,7 @@ abstract class QuorumTestHarness extends Logging {
faultHandlerFactory,
metadataDir,
controllerQuorumVotersFuture,
metaProperties.clusterId.get(),
formatter.clusterId(),
this,
faultHandler
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, T
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesVersion}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.Mockito

import scala.collection.{immutable, mutable}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Random

Expand Down Expand Up @@ -159,12 +159,13 @@ class ReplicaManagerConcurrencyTest extends Logging {
metadataCache: MetadataCache,
): ReplicaManager = {
val logDir = TestUtils.tempDir()
val metaProperties = new MetaProperties.Builder().
setVersion(MetaPropertiesVersion.V1).
val formatter = new Formatter().
setClusterId(Uuid.randomUuid().toString).
setNodeId(1).
build()
TestUtils.formatDirectories(immutable.Seq(logDir.getAbsolutePath), metaProperties, MetadataVersion.latestTesting(), None)
setNodeId(1)
formatter.addDirectory(logDir.getAbsolutePath)
formatter.setControllerListenerName("CONTROLLER")
formatter.setMetadataLogDirectory(logDir.getAbsolutePath)
formatter.run()

val props = new Properties
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, "100@localhost:12345")
Expand Down
Loading