Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Rewrite the meta.properties handling code in Java and fix some issues #14628

Merged
merged 1 commit into from
Nov 9, 2023

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Oct 24, 2023

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

@cmccabe cmccabe changed the title MINOR: Rewrite the meta.properties code in Java MINOR: Rewrite the meta.properties handling code in Java and fix some issues Oct 24, 2023
Copy link
Contributor

@pprovenzano pprovenzano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails for the case where there are no log directories to start and we are creating the node and directories for the first time.

ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) java.lang.RuntimeException: No readable meta.properties files found. at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:406) at kafka.server.KafkaServer.startup(KafkaServer.scala:245) at kafka.Kafka$.main(Kafka.scala:113) at kafka.Kafka.main(Kafka.scala)

@@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(nodes.bootstrapMetadataVersion(), "testkit");
MetaPropertiesEnsemble metaPropertiesEnsemble = new MetaPropertiesEnsemble.Loader().
setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION).
addMetadataLogDir(node.metadataDirectory()).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoadMissingBehavior.EXCEPTION is not defined anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was left over from some earlier code, sorry. Removed.

Copy link
Contributor

@pprovenzano pprovenzano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also fails to upgrade a 3.6.0 created meta.properties to include the directoryIds.

) {
props.store(pw, "");
fos.flush();
fos.getFD().sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my better understanding, could you please explain why fsync() is necessary for the temp file? I am curious because it is going to be renamed in the next step and if there is a process crash losing data in page cache, we anyways won't want to re-use this temp file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the process crashes and we lose the data in the page cache (from say a power failure) after the rename but before the data of the file is on disk, then on some filesystems this would result in an empty target file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the previous code in BrokerMetadataCheckpoint.scala did. The reason for doing it this way is for extra safety, as @pprovenzano commented. Some filesystems (I think ext3 was a notable offender) tended to lose data when fsync was omitted, even in the case where other operations like rename were performed later.

Since we only very rarely write the meta.properties file, it's surely not worth optimizing this by avoiding the fsync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...that makes sense, but in that case, do we need to flush the directory again after renaming? (In atomicMoveWithFallback() the third parameter defaults to needFlushParentDir=true).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some filesystems, if you don't fsync the parent directory, the file can be lost if the machine loses power. This is again something ext3 was famous for. Letting you sync files to disk, but then making them unreachable because you didn't sync the directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Thank you for answering my questions.

}
File targetFile = new File(path);
try {
Utils.atomicMoveWithFallback(tempFile.toPath(), targetFile.toPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, do we want to use atomicMoveWithFallback with needFlushParentDir = false here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, this is needed for safetly. I will add a boolean in case someone wants to use this function in a context where safety doesn't matter as much (like unit tests)

import java.nio.file.Paths;
import java.util.Properties;

public final class PropertiesUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this as a "type" of file (perhaps called PropertiesFile) that Kafka creates similar to how we have a type CheckpointFile or Snapshots? This new type of file could be re-used in future for other use case. It would also help unify places where we are writing to file system and prevent accidental fsync() which we are trying to solve in #14242

Copy link
Contributor Author

@cmccabe cmccabe Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Well, the purpose of MetaProperties is that we know what types we want in the meta.properties file and we can check for those specifically.

If we just want a generic key/value file where we don't know ahead of time what is in it, java.util.Properties does that just fine. It has a bit of old-school JDK 1.0 messiness, but on the other hand it's instantly familiar to Java programmers, and that has some value. So I doubt replacing or wrapping would repay the time investment needed, to be honest.

I don't see the connection with fsync since we sometimes want to fsync properties files, but other times not. So it's ultimately up to the programmer to do the right thing, as always.

* An immutable class which contains the per-log-directory information stored in an individual
* meta.properties file.
*/
public final class MetaProperties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to move this in core module? Asking because (correct me if I am wrong), these properties are not coupled with Kafka control plane / metadata handling layer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good question. I guess I think it does belong here.

The :metadata module in gradle is about more than just the controller. it's about how kafka handles metadata in general and has things like node registrations, metadata publishing and loading, etc. I think this is in keeping with that.

Also, independently of all that, we should all be striving to shrink the :core module, not add more stuff there. :)

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice cleanup, thank you. I wish I had thought of this when I added the directoryId changes.

private final OptionalInt nodeId;

/**
* The JBOD directory ID, or Optional.empty if none is specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The JBOD directory ID, or Optional.empty if none is specified.
* The directory ID, or Optional.empty if none is specified.

LOG.info("Wrote out {} meta.properties file {} containing {}",
newSet.contains(entry.getKey()) ? "new" : "changed",
entry.getKey(), entry.getValue());
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertiesUtils.writePropertiesFile throws IOException, so can we restrict the catch exception type to that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -57,7 +56,7 @@ public boolean hasOfflineLogDir(String logDir) {
* @param msg Error message.
* @param e Exception instance.
*/
public void maybeAddOfflineLogDir(String logDir, String msg, IOException e) {
public void maybeAddOfflineLogDir(String logDir, String msg, Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems odd to take a directory offline with an exception that's not related to IO. I think we don't need this change if MetaPropertiesEnsemble.writeLogDirChanges() restricts the catch type to IOException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I changed it above to just catch IOE so I will change it here as well.

return this == V0;
}

public boolean alwaysHasId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is a bit oddly named. Did you mean alwaysHasNodeId instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's rename it to alwaysHasNodeId

}
})

// Set directory IDs on all directories. Rewrite the files if needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a better place to generate missing directory IDs 👍

Copy link
Contributor Author

@cmccabe cmccabe Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Maybe eventually we'll also auto-upgrade from v0 -> v1 here (once not in migration mode any more)

v0 is quite annoying since there's basically no required fields at all

But one step at a time...

val newMetaProperties = new MetaProperties.Builder(metaProperties).
setDirectoryId(copier.generateValidDirectoryId()).
build()
copier.logDirProps().put(logDir, newMetaProperties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is a bit strange – rather than returning a modifiable map, something like copier.putLogDirProps(logDir, newMetaProperties) would be more intuitive.

Copy link
Contributor Author

@cmccabe cmccabe Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I added a setter function.

val initialMetaPropsEnsemble = {
val loader = new MetaPropertiesEnsemble.Loader()
config.logDirs.foreach(loader.addLogDir(_))
loader.load()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be generating missing directory IDs here? As we do in KafkaRaftServer, since LogManager no longer does that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. good catch. @pprovenzano also found this bug through testing :)

Comment on lines 282 to 290
metaProps.directoryId().ifPresent(directoryId => {
result += (dir.getAbsolutePath -> directoryId)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems off to me. I think we should expect every log directory to have an ID by this point.

Copy link
Contributor Author

@cmccabe cmccabe Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree it should have a directory ID by this point, during the course of normal operation. By handling the no-ID case, I was trying to avoid unit tests failing. (To be clear, I haven't tested if they do fail, but I thought they might.)

Maybe we could remove this as a follow-on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds ok to me 👍

@cmccabe
Copy link
Contributor Author

cmccabe commented Oct 26, 2023

This fails for the case where there are no log directories to start and we are creating the node and directories for the first time.

Thanks for testing. Yes, this was missing some code to write out meta.properties files to empty directories in the ZK case. Should be fixed in the next revision.

@cmccabe
Copy link
Contributor Author

cmccabe commented Oct 31, 2023

This was getting too big, so I split off a part 1 here : https://github.com/apache/kafka/pull/14678/files

@cmccabe
Copy link
Contributor Author

cmccabe commented Nov 3, 2023

I updated this based on part 1 (which is now committed) and the other changes in trunk.

@pprovenzano
Copy link
Contributor

This PR does not work for ZK. You cannot create the directories in LogManager.scala but then write the directoryIds later. The directoryIds must be generated either when LogManager scala is first called or the directories must be created with the new meta.properties file before LogManager is called.

@cmccabe
Copy link
Contributor Author

cmccabe commented Nov 7, 2023

This PR does not work for ZK. You cannot create the directories in LogManager.scala but then write the directoryIds later. The directoryIds must be generated either when LogManager scala is first called or the directories must be created with the new meta.properties file before LogManager is called.

I moved the meta.properties creation before the creation of LogManager.scala to avoid this issue

Copy link
Member

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor @cmccabe. As always, nice to see the Scala code rewritten as Java. I haven't tested this, but looking through the changes I think it looks good. I left a few small comments inline. Pending a good build, LGTM.

* The version of a meta.properties file.
*/
public enum MetaPropertiesVersion {
V0(0),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document the differences between v0 and v1 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added.

}

public String numberString() {
return "" + number;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why not use Integer.toString here? Is this string coercion more efficient or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just lazy. "" + something always works, and doesn't require you to know the type of "something"

Integer.toString is probably better (although I'm not 100% sure), so I'll change it to that.

public void writeLogDirChanges(BiConsumer<String, IOException> errorHandler) {
Map<String, MetaProperties> newOrChanged = new HashMap<>();
HashSet<String> newSet = new HashSet<>();
for (Entry<String, MetaProperties> entry : prev.logDirProps().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting the entrySet of an Ensemble which doesn't include directoryId() so adding a directoryId doesn't show up as changed and thus it won't get committed to disk.

Copy link
Contributor Author

@cmccabe cmccabe Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there was a bug here where the metaProps we were taking was actually the previous one. Fixed.

copier.setLogDirProps(logDir, builder.build())
})
copier.emptyLogDirs().clear()
copier.writeLogDirChanges((logDir, e) => {
Copy link
Contributor

@pprovenzano pprovenzano Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to create the directory if it doesn't exist. Here is some code I took from LogManager.

diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9843204dc1..48eee053ce 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -62,7 +62,7 @@ import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.zookeeper.client.ZKClientConfig
 
-import java.io.IOException
+import java.io._
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util
 import java.util.{Optional, OptionalInt, OptionalLong}
@@ -276,13 +276,28 @@ class KafkaServer(
           val copier = new MetaPropertiesEnsemble.Copier(initialMetaPropsEnsemble)
           initialMetaPropsEnsemble.nonFailedDirectoryProps().forEachRemaining(e => {
             val logDir = e.getKey
+
+            val dir = new File(logDir).getAbsoluteFile
             val builder = new MetaProperties.Builder(e.getValue).
               setClusterId(_clusterId).
               setNodeId(config.brokerId)
-            if (!builder.directoryId().isPresent()) {
-              builder.setDirectoryId(copier.generateValidDirectoryId())
+
+            try {
+              if (!builder.directoryId().isPresent()) {
+                if (!dir.exists) {
+                  info(s"Log directory ${dir.getAbsolutePath} not found, creating it.")
+                  val created = dir.mkdirs()
+                  if (!created)
+                    throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
+                  Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
+                }
+                builder.setDirectoryId(copier.generateValidDirectoryId())
+              }
+              copier.setLogDirProps(logDir, builder.build())
+            } catch {
+              case e: IOException =>
+                info(s"Failed to create or validate data directory ${dir.getAbsolutePath}", e)
             }
-            copier.setLogDirProps(logDir, builder.build())
           })
           copier.emptyLogDirs().clear()
           copier.writeLogDirChanges((logDir, e) => {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a boolean that indicates the director should be created if it doesn't exist.

Copy link
Contributor

@pprovenzano pprovenzano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a few more comments.

… issues apache#14628

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.
@cmccabe cmccabe merged commit 7060c08 into apache:trunk Nov 9, 2023
@cmccabe cmccabe deleted the KAFKA-15532 branch November 9, 2023 17:32
val brokerId = config.brokerId

if (brokerId >= 0 && brokerMetadata.brokerId.exists(_ != brokerId))
throw new InconsistentBrokerIdException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe There is a test that's meant to catch this exception, but it was buggy: https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala#L137

What's the expected behavior now and should the code or test be changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just throws a RuntimException now, I believe. I don't think it's necessary to have a custom exception for every meta.properties issue

@ijuma ijuma mentioned this pull request Nov 19, 2023
3 tasks
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Jan 2, 2024
… issues apache#14628 (apache#14628)

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

Reviewers: Igor Soarez <[email protected]>, David Arthur <[email protected]>, Divij Vaidya <[email protected]>, Proven Provenzano <[email protected]>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
… issues apache#14628 (apache#14628)

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

Reviewers: Igor Soarez <[email protected]>, David Arthur <[email protected]>, Divij Vaidya <[email protected]>, Proven Provenzano <[email protected]>
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
… issues apache#14628 (apache#14628)

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

Reviewers: Igor Soarez <[email protected]>, David Arthur <[email protected]>, Divij Vaidya <[email protected]>, Proven Provenzano <[email protected]>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
… issues apache#14628 (apache#14628)

meta.properties files are used by Kafka to identify log directories within the filesystem.
Previously, the code for handling them was in BrokerMetadataCheckpoint.scala. This PR rewrites the
code for handling them as Java and moves it to the apache.kafka.metadata.properties namespace. It
also gets rid of the separate types for v0 and v1 meta.properties objects. Having separate types
wasn't so bad back when we had a strict rule that zk clusters used v0 and kraft clusters used v1.
But ZK migration has blurred the lines. Now, a zk cluster may have either v0 or v1, if it is
migrating, and a kraft cluster may have either v0 or v1, at any time.

The new code distinguishes between an individual meta.properties file, which is represented by
MetaProperties, and a collection of meta.properties files, which is represented by
MetaPropertiesEnsemble. It is useful to have this distinction, because in JBOD mode, even if some
log directories are inaccessible, we can still use the ensemble to extract needed information like
the cluster ID. (Of course, even when not in JBOD mode, KRaft servers have always been able to
configure a metadata log directory separate from the main log directory.)

Since we recently added a unique directory.id to each meta.properties file, the previous convention
of passing a "canonical" MetaProperties object for the cluster around to various places in the code
needs to be revisited. After all, we can no longer assume all of the meta.properties files are the
same. This PR fixes these parts of the code. For example, it fixes the constructors of
ControllerApis and RaftManager to just take a cluster ID, rather than a MetaProperties object. It
fixes some other parts of the code, like the constructor of SharedServer, to take a
MetaPropertiesEnsemble object.

Another goal of this PR was to centralize meta.properties validation a bit more and make it
unit-testable. For this purpose, the PR adds MetaPropertiesEnsemble.verify, and a few other
verification methods. These enforce invariants like "the metadata directory must be readable," and
so on.

Reviewers: Igor Soarez <[email protected]>, David Arthur <[email protected]>, Divij Vaidya <[email protected]>, Proven Provenzano <[email protected]>
@@ -60,7 +63,11 @@ object StorageTool extends Logging {
if (!metadataVersion.isKRaftSupported) {
throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
val metaProperties = buildMetadataProperties(clusterId, config.get)
Copy link
Member

@chia7712 chia7712 Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe Sorry for making noise in this merged PR, but I have a question about valid "cluster id"

Before this patch, the cluster id was required to be a "valid" UUID (buildMetadataProperties does validation). For example, the length can't be larger than 24. However, this patch removed the validation of UUID and hence we can start a cluster with non-uuid cluster id. Is this change expected?

Copy link
Member

@soarez soarez Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm keen for @cmccabe to confirm, but suspect this is not intentional.

Since we released 3.7, 3.7.1 and soon 3.8 without this validation, we should continue to accept it in the broker startup logic so we don't break anyone's existing cluster. But I don't see a reason why we shouldn't add the validation back to StorageTool, to prevent new clusters being created with a non UUID value for cluster ID. WDYT?

Copy link
Member

@chia7712 chia7712 Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we released 3.7, 3.7.1 and soon 3.8 without this validation, we should continue to accept it in the broker startup logic so we don't break anyone's existing cluster. But I don't see a reason why we shouldn't add the validation back to StorageTool, to prevent new clusters being created with a non UUID value for cluster ID. WDYT?

agree that adding uuid check back will have compatibility issue. Also, I grep code base and don't find any uuid restriction to cluster id. Hence, I plan to open a jira to address following changes:

  1. update docs (https://kafka.apache.org/documentation/#kraft_storage) to remind that "cluster id" can be non-uuid format after 3.7 (https://issues.apache.org/jira/browse/KAFKA-17123)
  2. change the type from uuid to string fo r testing code to avoid misunderstand. (https://issues.apache.org/jira/browse/KAFKA-17122)
  3. cleanup the unused code from StroageTool (https://issues.apache.org/jira/browse/KAFKA-17118)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was an accident, I don't think we should make it permanent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should make it permanent.

If the "uuid check" is added back to StorageTool, we have to face following issues:

  1. compatibility (as @soarez mentioned)
  2. the conflicts caused by base64. That happens recently in our cluster. the input cluster id is "lYEJ7pnYNO2ovNYiNRJBGl", but the cluster id stored in meta.properties is "lYEJ7pnYNO2ovNYiNRJBGg".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was an accident, I don't think we should make it permanent.

@ijuma could you clarify your suggestion a bit? Do you mean that we shouldn't make it permanent:

  1. only in the generation of meta.properties (StorageTool)
  2. also in the loading of meta.properties (KafkaRaftServer)
  3. (some other option?)

If you're suggesting we re-introduce loading validation (2), are there any options for operators that have created clusters in production with cluster IDs that would fail that validation? Or should we not mind those?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the most important thing is "Had we ever convert cluster id from string to UUID"? If the answer is YES, we ought to add the check back to StorageTool and then announce that the cluster id written by 3.7.0 and 3.7.1 is not validated according to UUID format.

Fortunately, I don't notice that usage (string -> UUID) currently. Hence, the compatibility issue happens in StorageTool: Should we allow user to use non-uuid cluster id to format? If no, we need to handle the compatibility issue and the conflicts of base64 encoding. If yes, the side effect is ... user will use shorter/longer string? Sorry for my poor imagination 😢

At any rate, we are going to release 3.8.0. Hence, it would be nice to have a decision ASAP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KRaft formatting tool did have a requirement that the cluster ID had to be a Kafka UUID for a while. However, cluster ID has always been a string when in ZK mode. Because we now support migration from ZK to KRaft, we need to support cluster ID as a string.

If we want to have cluster IDs be UUIDs then I think it would be better to have the discussion on the mailing list than here in this PR. The main question we'd have to answer is how to convert existing string cluster IDs to UUID. Another option, I suppose, would be to require a UUID for new clusters, but allow the arbitrary string for old ones. The main advantage of that would probably just be preventing pathological cluster names (people trying to name their cluster a whitespace string, or an emoji, etc.)

I don't think we will be able to get to the point where the APIs for fetching cluster ID return UUID very easily... it will be a multi-year effort. Unfortunately it's very easy to create string IDs, and very hard to get rid of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - so this is not an accidental change as it was stated earlier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe thanks for great explanation. We will leverage your comments to update the docs of StorageTool!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants