Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16518; Adding standalone argument for storage #16325

Closed
wants to merge 15 commits into from
133 changes: 125 additions & 8 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.tools

import kafka.server.KafkaConfig

import java.io.PrintStream
import java.io.{File, PrintStream}
import java.nio.file.{Files, Paths}
import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
Expand All @@ -36,10 +36,18 @@ import org.apache.kafka.common.security.scram.internals.ScramFormatter
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.internals.{StringSerde, VoterSet}
import org.apache.kafka.server.common.FeatureVersion

import org.apache.kafka.snapshot.{FileRawSnapshotWriter, RecordsSnapshotWriter}
import org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_NAME
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.raft.QuorumConfig.{parseVoterConnections, validateControllerQuorumVoters}
import org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID

import java.net.InetSocketAddress
import java.util
import java.util.{Base64, Collections, Optional}
import java.util.{Base64, Collections, Optional, OptionalInt}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -90,6 +98,7 @@ object StorageTool extends Logging {
/**
* Validates arguments, configuration, prepares bootstrap metadata and delegates to {{@link formatCommand}}.
* Visible for testing.
*
* @param namespace Arguments
* @param config The server configuration
* @return The exit code
Expand All @@ -102,6 +111,24 @@ object StorageTool extends Logging {
setClusterId(clusterId).
setNodeId(config.nodeId).
build()
val standaloneMode = namespace.getBoolean("standalone")

val controllersQuorumVoters = namespace.getString("controller_quorum_voters")
if(standaloneMode && controllersQuorumVoters != null) {
throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.")
}

var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap()
if (standaloneMode) {
listeners = createStandaloneVoterMap(config)
} else if(controllersQuorumVoters != null) {
if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
throw new TerseFailure("Expected schema for --controller-quorum-voters is <replica-id>[-<replica-directory-id>]@<host>:<port>")
}
val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters))
listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config)
}

val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
val specifiedFeatures: util.List[String] = namespace.getList("feature")
val releaseVersionFlagSpecified = namespace.getString("release_version") != null
Expand Down Expand Up @@ -137,7 +164,7 @@ object StorageTool extends Logging {
"a legacy cluster. Formatting is only supported for clusters in KRaft mode.")
}
formatCommand(System.out, directories, metaProperties, bootstrapMetadata,
metadataVersion,ignoreFormatted)
metadataVersion, ignoreFormatted, listeners)
}

private def validateMetadataVersion(metadataVersion: MetadataVersion, config: KafkaConfig): Unit = {
Expand Down Expand Up @@ -226,7 +253,15 @@ object StorageTool extends Logging {
help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}")
formatParser.addArgument("--feature", "-f").
help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`.").
action(append());
action(append())
formatParser.addArgument("--standalone", "-s").
help("This flag will bootstrap the controller in standalone as the only KRaft controller if the Kafka" +
" cluster. Use the --controller-quorum-voters flag instead to bootstrap a controller cluster with more than one" +
" controller.").
action(storeTrue())
formatParser.addArgument("--controller-quorum-voters", "-q").
help("This flag will bootstrap a controller cluster with more than one controller.").
action(store())

parser.parseArgsOrFail(args)
}
Expand Down Expand Up @@ -507,10 +542,12 @@ object StorageTool extends Logging {
directories: Seq[String],
metaProperties: MetaProperties,
metadataVersion: MetadataVersion,
ignoreFormatted: Boolean
ignoreFormatted: Boolean,
listeners: util.Map[ListenerName, InetSocketAddress]
): Int = {
val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, "format command")
formatCommand(stream, directories, metaProperties, bootstrapMetadata, metadataVersion, ignoreFormatted)
formatCommand(stream, directories, metaProperties, bootstrapMetadata, metadataVersion, ignoreFormatted,
listeners)
}

def formatCommand(
Expand All @@ -519,7 +556,8 @@ object StorageTool extends Logging {
metaProperties: MetaProperties,
bootstrapMetadata: BootstrapMetadata,
metadataVersion: MetadataVersion,
ignoreFormatted: Boolean
ignoreFormatted: Boolean,
listeners: util.Map[ListenerName, InetSocketAddress]
): Int = {
if (directories.isEmpty) {
throw new TerseFailure("No log directories found in the configuration.")
Expand Down Expand Up @@ -563,6 +601,13 @@ object StorageTool extends Logging {
})
})
copier.writeLogDirChanges()
if (listeners != null && !listeners.isEmpty) {
metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
val voterSet: VoterSet = getVoterSet(metaProperties.nodeId(),
copier.logDirProps().get(logDir).directoryId().get(), listeners)
writeCheckpointFile(stream, logDir, voterSet)
})
}
}
0
}
Expand All @@ -589,4 +634,76 @@ object StorageTool extends Logging {
(nameAndLevel._1, nameAndLevel._2)
}.toMap
}

def writeCheckpointFile(stream: PrintStream, logDir: String,
voterSet: VoterSet): Unit = {
val snapshotDir = createLogDirectory(new File(logDir), CLUSTER_METADATA_TOPIC_NAME)
// Create the raw snapshot writer
val rawSnapshotWriter = FileRawSnapshotWriter.create(snapshotDir.toPath, BOOTSTRAP_SNAPSHOT_ID)

if(voterSet != null){
val builder = new RecordsSnapshotWriter.Builder()
.setKraftVersion(1)
.setVoterSet(Optional.of(voterSet))
.setRawSnapshotWriter(rawSnapshotWriter)
.build(new StringSerde)
try{
builder.freeze()
} finally{
// Close the builder to finalize the snapshot
builder.close()
stream.println(s"Snapshot written to $snapshotDir")
}
}
}

private def getVoterSet(nodeId: OptionalInt, directoryId: Uuid, listeners: java.util.Map[ListenerName, InetSocketAddress]) = {
val endpoints: Endpoints = Endpoints.fromInetSocketAddresses(listeners)
val voterSet = VoterSet.fromMap(endpoints, nodeId.getAsInt, directoryId)
voterSet
}

def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = {
val advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners
val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap()
advertisedListenerEndpoints.foreach(endpoint => {
val host: String = endpoint.host
listeners.put(endpoint.listenerName, new InetSocketAddress(host, endpoint.port))
})
listeners
}

private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress],
metaProperties: MetaProperties,
config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = {
Comment on lines +676 to +678
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect indentation:

  private def parseControllerQuorumVotersMap(
    controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress],
    metaProperties: MetaProperties,
    config: KafkaConfig
): util.Map[ListenerName, InetSocketAddress] = {

Aren't you losing information if you map from util.Map[Integer, InetSocketAddress] to util.Map[ListenerName, InetSocketAddress]? Why are you removing replicas that are not the local replica? The VoterSet must contain all of the voters in --controller-quorum-voters not just the local replica.

Why would Kafka require all of the voters in --controller-quorum-voters to only use the local voter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the kip description
"When the format command is executed with this option it will read the node.id configured in the properties file specified by the --config option and compare it against the specified in --controller-quorum-voters. If there is a match, it will write the specified to the directory.id property in the meta.properties for the metadata.log.dir directory."

I tried adding the if condition
if (metaProperties.nodeId().getAsInt == replicaId) )

in the method

May be am wrong. Can you pls suggest code maybe?

val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap()
controllerQuorumVoterMap.keySet().forEach(replicaId => {
if (metaProperties.nodeId().getAsInt == replicaId) {
val listenerNameOption = config.effectiveAdvertisedControllerListeners.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assume that the listener name is the default listener name. The default listener name is the first listener in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L730-L737.

There is also this code for an example of getting the first listener name: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L267

There is one validation that we should do for the local replica. The local replica's default listener (name, host and port) matches the entry specified in --controller-quorum-voters.

find {
endpoint =>
endpoint.port == controllerQuorumVoterMap.get(replicaId).getPort &&
(controllerQuorumVoterMap.get(replicaId).getHostString.split("//").contains(endpoint.host)
|| (endpoint.host == null && controllerQuorumVoterMap.get(replicaId).getHostString.contains("localhost")))
}.
map(_.listenerName)
listenerNameOption match {
case Some(listenerName) =>
listeners.put(listenerName, controllerQuorumVoterMap.get(replicaId))
case None =>
// No matching endpoint was found
throw new TerseFailure(s"No matching endpoint was found in controller quorum voters")
}
}
})
listeners
}

private def createLogDirectory(logDir: File, logDirName: String): File = {
val logDirPath = logDir.getAbsolutePath
val dir = new File(logDirPath, logDirName)
Files.createDirectories(dir.toPath)
dir
}

}
Loading