-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: fix NPE with HadoopFileIO because FileIOParser doesn't serialize Hadoop configuration #10926
Core: fix NPE with HadoopFileIO because FileIOParser doesn't serialize Hadoop configuration #10926
Conversation
public HadoopFileIO() { | ||
// Create a default hadoopConf as it is required for the object to be valid. | ||
// E.g. newInputFile would throw NPE with hadoopConf.get() otherwise. | ||
this.hadoopConf = new SerializableConfiguration(new Configuration())::get; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileIOParser doesn't serialize and deserialize the Hadoop configuration. the deserialized io object is not valid because hadoopConf
is null. NPE would be throw if newInputFile
is called on deserialized object.
not sure if we want to change FileIOParser
to serialize Hadoop Configuration
object or not. Regardless, I feel this change seems reasonable, as this class requires hadoopConf
to not be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this problem a bit more. The one downside I see with this change is that HadoopFileIO
will typically be instantiated via CatalogUtil.loadFileIO
, which will then set the Conf
. Also the javadoc of the constructor mentions that the conf must be set by calling setConf()
.
Instead of serializing the conf in the Parser, maybe the conf should be initialized in FileIOParser.fromJson(String)
instead of passing a null conf in
return fromJson(json, null); |
HadoopFileIO
is created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the FileIO
is Configurable
, create a default Configuration
object and call setConf()
. is that what you mean? that will add Hadoop class dependency to the FileIOParser
though.
I have also thought about implementing the serialization of Hadoop Configuration
in the FileIOParser
. but it will add some complexity to the FileIOParser
and we likely won't achieve 100% fidelity with the original object. E.g., we may only serialize the key-value string pairs from the configuration as a JSON object. I will also have the problem of adding Hadoop class dependency to the FileIOParser
.
I also thought about use this method from SerializationUtil
? But it is Java serialization (not JSON serialization).
/**
* Serialize an object to bytes. If the object implements {@link HadoopConfigurable}, its Hadoop
* configuration will be serialized into a {@link SerializableConfiguration}.
*
* @param obj object to serialize
* @return serialized bytes
*/
public static byte[] serializeToBytes(Object obj) {
return serializeToBytes(obj, conf -> new SerializableConfiguration(conf)::get);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah you're right that this will add a Hadoop dependency and we don't want that.
Rather than setting a conf in the constructor, maybe we should have some handling in conf()
, such as
public Configuration conf() {
- return hadoopConf.get();
+ return Optional.ofNullable(hadoopConf).map(Supplier::get).orElseGet(Configuration::new);
}
We would need to adjust all the places that call hadoopConf.get()
to call conf()
after this change. That way we wouldn't need to unnecessarily instantiate a conf in the constructor when we don't have to, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conf()
method is fine.
FileIOParser
doesn't faithfully serialize and deserialize the HadoopFileIO
(Hadoop configuration is not carried over). The deserialized HadoopFileIO
may miss important configs, which can be a problem.
from CatalogUtil
, there are 3 components defining a FileIO
. FileIOParser
is missing the conf
.
FileIO loadFileIO(String impl, Map<String, String> properties, Object hadoopConf)
I am wondering if we should change FileIOParser
to serialize and deserialize Hadoop Configuration
when the FileIO
is HadoopConfigurable
. We can probably only serialize the key-value string pairs from the Configuration
as a JSON object (kind of a read only copy).
BTW, ResolvingFileIO
and HadoopConfigurable
also have Hadoop class dependency. There was a discussion of potentially moving HadoopCatalog
to a separate iceberg-hadoop
module. I guess we can't move ResolvingFileIO
and HadoopConfigurable
then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have seen that we use new Configuration(false) in the code, so we allow for the user to provide a trimmed configuration, and in this case the serialized config is quite small for the binary serialization. We might have to do something similar for the JSON serialization.
Are you saying trim the key-value pairs with the entries from default configuration as new Configuration(false)
. We can potentially do that. but it still has the implication that it depends on the runtime env. if the other side (deserialization) has a different env (default config), this can be different.
The ManifestListReadTask.rows() and ManifestListReadTask.file() is using the io to get the new input file like io.newInputFile(manifestListLocation).
FileIO
is used to read from manifest file. ManifestFiles.read
is a widely used API.
private CloseableIterable<? extends ContentFile<?>> files(Schema fileProjection) {
switch (manifest.content()) {
case DATA:
return ManifestFiles.read(manifest, io, specsById).project(fileProjection);
case DELETES:
return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(fileProjection);
...
}
FileIO serialization will become even more complicated when the manifest file encryption arrives here. We will need to apply the encryption for the FileIO
From the usage, EncryptingFileIO
seems to be used only as a derived object. it is not part of the table ops state. It shouldn't need to be serialized. only original FileIO
and EncryptionManager
need to be serialized.
protected EncryptedOutputFile newManifestOutputFile() {
String manifestFileLocation =
ops.metadataFileLocation(
FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
return EncryptingFileIO.combine(ops.io(), ops.encryption())
.newEncryptingOutputFile(manifestFileLocation);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it still has the implication that it depends on the runtime env. if the other side (deserialization) has a different env (default config), this can be different.
Yes, but I think (not tested) we suffer from the same issue, if the configuration provided through the Catalog is created as Configuration(false)
.
FileIO is used to read from manifest file. ManifestFiles.read is a widely used API.
Yes, but this case the reader provides the FileIO independently from the Task.
only original FileIO and EncryptionManager need to be serialized.
Yes. Encryption should be handled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary if I understand correctly, your point is that FileIO
probably shouldn't be part of the task state for BaseEntriesTable.ManifestReadTask
, AllManifestsTable.ManifestListReadTask
? Then we don't need to serialize FileIO
for those manifest related tasks. That would be a fair question. It seems like a large/challenging refactoring. Looking for other folks' take on this.
Regardless, the issue remains that FileIOParser
doesn't serialize HadoopFileIO
faithfully. I don't know if REST catalog has any need to use it to JSON serialize FileIO
in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand correctly, your point is that FileIO probably shouldn't be part of the task state for BaseEntriesTable.ManifestReadTask, AllManifestsTable.ManifestListReadTask? [..] It seems like a large/challenging refactoring. Looking for other folks' take on this.
Yes, this seems strange, and problematic to me. I was not able to find an easy solution yet. I was hoping, others with better knowledge might have some ideas.
Regardless, the issue remains that FileIOParser doesn't serialize HadoopFileIO faithfully. I don't know if REST catalog has any need to use it to JSON serialize FileIO in the future.
My point here is that, since we use Configuration(false)
in some cases, and the way how the current serialization works, we already doesn't serialize HadoopFileIO faithfully
. So If we don't find a solution for getting rid of the FileIO, we might as well write our own "unfaithful" serialization which mimics the way how the current serialization works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary coming back to the size concern of serializing Hadoop Configuration, that is already the case with Java serialization for the manifest tasks. Implementing JSON serialization for Hadoop configuration is not making things worse.
whether we can avoid the need of FileIO
in the manifest tasks can be a separate larger discussion.
3618660
to
8b0bfbe
Compare
I don't think that we should change how this works. A Hadoop Configuration MUST be provided externally. FileIO serialization is not intended to send the entire Hadoop Configuration and should remain separate. |
This makes sense. We already have some consensus that the current PR needs to be updated. We just don't have a consensus on how to fix the NPE problem where deserialized HadoopFile has null Hadoop Configuration object.
This is not super clear to me. What if users loaded the
@pvary had a related concern on the size of Hadoop Configuration entries if we serialize the all properties (most of them are default properties loaded from host). What if we just serialize the overridden properties (assuming the sender and receiver side have the same host level Hadoop conf xml)? Anyway, the current NPE problem with deserialized @rdblue, should we adopt @nastra 's suggestion of loading Hadoop Configuration in
|
There is already a way to pass the |
then we need to load the Hadoop configuration from 3 other JSON parsers for manifest and files tasks. Those classes are also in iceberg-cores these two methods are the pair that callers use. It is probably not reasonable for callers to load Hadoop configuration when calling
Ideally, this method should load the Hadoop Configuration object if the
|
Let me recap the problem and potential options problem: Option 1:
However, I feel Hadoop class dependency inevitable today for two reasons. (1) Personally, I would prefer to go forward with this stop-gap solution for now while we are gathering more consensus on the long-term solutions (one listed below). Option 2: change Internally,
Option 2 is a long-term solution IMO. |
@stevenzwu for Option 1, wouldn't https://github.com/apache/iceberg/pull/10926/files#r1718243019 also solve the issue with the NPE without introducing a Hadoop dependency on the Parser itself? |
agree it is better to avoid the Hadoop dep on the I will make a small change to your proposal to cache and set the loaded Hadoop |
8b0bfbe
to
1dfcf72
Compare
@@ -74,7 +74,7 @@ public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) { | |||
} | |||
|
|||
public Configuration conf() { | |||
return hadoopConf.get(); | |||
return getConf(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's enough to just update this call. You'd basically need to update every place in this class that uses hadoopConf.get()
to conf()
or to getConf()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are absolutely correct. sorry that I missed this earlier. updated the PR.
core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one suggestion
…java Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a minor comment; otherwise, it LGTM.
// Create a default hadoopConf as it is required for the object to be valid. | ||
// E.g. newInputFile would throw NPE with getConf() otherwise. | ||
if (hadoopConf == null) { | ||
this.hadoopConf = new SerializableConfiguration(new Configuration())::get; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a potential for a race condition here? Do you see a need to add synchronization to protect this instantiation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, FileIO
can potentially be used by multiple threads. let me add synchronization.
…opFileIO because FileIOParser doesn't serialize Hadoop configuration (apache#10926) Co-authored-by: Eduard Tudenhoefner <etudenhoefner@gmail.com>
This can happen in the FileIOParser serialization and deserialization scenario. FileIOParser doesn't serialize and deserialize the Hadoop configuration. the deserialized io object is not valid because hadoopConf is null. NPE would be throw if newInputFile is called on deserialized object.
This is a blocker for Flink to switch to JSON parser for scan tasks of metadata tables (like manifest etc.)