-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16518; Adding standalone argument for storage #16325
Conversation
@jsancio Output looks like below. ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s % cat /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000-0000000000.checkpoint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @muralibasani . We should also implement --controller-quorum-voters in this PR as it will inform the signature needed to implement both flags.
@muralibasani You can use |
51e8bc4
to
799bc77
Compare
@jsancio , updated with controller option
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes.
if (!validateControllerQuorumVoters(controllersQuorumVoters)) { | ||
throw new TerseFailure("Expected schema for --controller-quorum-voters is <replica-id>[-<replica-directory-id>]@<host>:<port>") | ||
} | ||
advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct, right? We need to parse the string that you validated in validateControllerQuorumVoters(controllersQuorumVoters)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint], | ||
controllersQuorumVoters: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider making the input to this method just voters: VoterSet
instead of advertisedListenerEndpoints
and controllerQuorrumVoters
.
When the --standalone flag is used the set of voters can be constructed from VoterSet.fromMap(createStandaloneVoterMap(nodeId, nodeDirectoryId, config.effectiveAdvertisedControllerListeners))
.
When the --controller-quorum-voters flag is used the set of voters can be constructed from VoterSet.fromMap(parseVotersToMap(controllersQuorumVoters))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried considering but directoryId is derived only in format method. So updated format method with only listeners param.
} | ||
|
||
if (standaloneMode) { | ||
advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bootstrapping checkpoint should only be generated on controllers (config.processRoles.contains(ProcessRole.ControllerRole)
). If the node is a broker only then the bootstrapping checkpoint should not get generated.
var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() | ||
if (standaloneMode) { | ||
advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners | ||
listeners = createStandaloneVoterMap(config) | ||
} else if(controllersQuorumVoters != null) { | ||
if (!validateControllerQuorumVoters(controllersQuorumVoters)) { | ||
throw new TerseFailure("Expected schema for --controller-quorum-voters is <replica-id>[-<replica-directory-id>]@<host>:<port>") | ||
} | ||
advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners | ||
val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters)) | ||
listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look correct. VoterSet
is basically a Map[Integer, (Uuid, Map[ListenerName, InetSocketAddress])]
where Integer
is the replica id and Uuid
is the replica directory id. The type for listener doesn't contain all of the information needed to generate a VoterSet
for all the possible configuration cases.
Note that the value for --controller-quorum-voters
has the follow schema: <replica-id>-<replica-directory-id>@<host>:<port>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means we would have to update
public static Map<Integer, InetSocketAddress> parseVoterConnections(List<String> voterEntries) { |
@@ -199,7 +200,7 @@ object StorageTool extends Logging { | |||
val level: java.lang.Short = specifiedFeatures.getOrElse(feature.featureName, feature.defaultValue(metadataVersionForDefault)) | |||
// Only set feature records for levels greater than 0. 0 is assumed if there is no record. Throw an error if level < 0. | |||
if (level != 0) { | |||
allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level, unstableFeatureVersionsEnabled)) | |||
allNonZeroFeaturesAndLevels.append(feature.fromFeatureLevel(level, unstableFeatureVersionsEnabled)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are missing a space.
) : UserScramCredentialRecord = { | ||
mechanism: String, | ||
config: String | ||
) : UserScramCredentialRecord = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra space between )
and :
. It should be ): User... = {
val argMap = config.substring(1, config.length - 1) | ||
.split(",") | ||
.map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) | ||
.map(args => args(0) -> args(1).replaceAll("\"", "")).toMap | ||
.split(",") | ||
.map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) | ||
.map(args => args(0) -> args(1).replaceAll("\"", "")).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this is not the correct indentation.
throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") | ||
throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") | ||
} | ||
if (iterations > scramMechanism.maxIterations()) { | ||
throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") | ||
throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have extra two spaces!
private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], | ||
metaProperties: MetaProperties, | ||
config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect indentation:
private def parseControllerQuorumVotersMap(
controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress],
metaProperties: MetaProperties,
config: KafkaConfig
): util.Map[ListenerName, InetSocketAddress] = {
Aren't you losing information if you map from util.Map[Integer, InetSocketAddress]
to util.Map[ListenerName, InetSocketAddress]
? Why are you removing replicas that are not the local replica? The VoterSet
must contain all of the voters in --controller-quorum-voters
not just the local replica.
Why would Kafka require all of the voters in --controller-quorum-voters
to only use the local voter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the kip description
"When the format command is executed with this option it will read the node.id configured in the properties file specified by the --config option and compare it against the specified in --controller-quorum-voters. If there is a match, it will write the specified to the directory.id property in the meta.properties for the metadata.log.dir directory."
I tried adding the if condition
if (metaProperties.nodeId().getAsInt == replicaId) )
in the method
May be am wrong. Can you pls suggest code maybe?
val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() | ||
controllerQuorumVoterMap.keySet().forEach(replicaId => { | ||
if (metaProperties.nodeId().getAsInt == replicaId) { | ||
val listenerNameOption = config.effectiveAdvertisedControllerListeners. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should assume that the listener name is the default listener name. The default listener name is the first listener in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L730-L737.
There is also this code for an example of getting the first listener name: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L267
There is one validation that we should do for the local replica. The local replica's default listener (name, host and port) matches the entry specified in --controller-quorum-voters
.
"16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], | ||
() => StorageTool.buildMetadataProperties("invalid", config)).getMessage) | ||
"16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure], | ||
() => StorageTool.buildMetadataProperties("invalid", config)).getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation doesn't look correct.
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]", | ||
"-S", | ||
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]") | ||
"SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]", | ||
"-S", | ||
"SCRAM-SHA-256=[name=george,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\",saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\",iterations=8192]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect indentation.
assertEquals(0, exitStatus) | ||
assertEquals(0, exitStatus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect indentation!
Closing this, as the StorageTool class will be refactored in a different PR. |
We ended up doing this in: #16669 |
Resolves : https://issues.apache.org/jira/browse/KAFKA-16518
Adds a new argument "standalone" to kafka-storage.sh
If standalone mode, creates a checkpoint file in metadata dir ${kafkaConfig.metadataLogDir}/__cluster_metadata-0/
Committer Checklist (excluded from commit message)