-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add property to skip empty files in Hive Connector #22727
Conversation
@@ -195,6 +195,9 @@ Property Name Description | |||
S3SelectPushdown. | |||
|
|||
``hive.metastore.load-balancing-enabled`` Enable load balancing between multiple Metastore instances | |||
|
|||
``hive.skip-empty-files`` Enable skipping empty files. Otherwise, it will produce an ``false`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a false
explicitly specify the default value of this property
What is a "false error"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's a table.
@@ -1816,4 +1818,17 @@ public HiveClientConfig setAffinitySchedulingFileSectionSize(DataSize affinitySc | |||
this.affinitySchedulingFileSectionSize = affinitySchedulingFileSectionSize; | |||
return this; | |||
} | |||
|
|||
@Config("hive.skip-empty-files") | |||
@ConfigDescription("Enables skip of parquet empty files avoiding output error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip empty files rather than reporting an error
What about ORC files?
|
||
@Config("hive.skip-empty-files") | ||
@ConfigDescription("Enables skip of parquet empty files avoiding output error") | ||
public HiveClientConfig setSkipEmptyFilesEnabled(boolean parallelParsingOfPartitionValuesEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change argument name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private void createQueryFailRunner() | ||
throws Exception | ||
{ | ||
logger.info("Creating 'QueryFailRunner'"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logs in tests please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
protected QueryRunner createQueryRunner() | ||
throws Exception | ||
{ | ||
logger.info("Creating 'QueryRunner'"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logs in tests please, only failure messages. Our logs are already too full of random messages to efficiently locate the actual failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
private static void deleteMetadata(File temporaryDirectory) | ||
{ | ||
File[] data = temporaryDirectory.listFiles(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data --> files or metadataFiles or something like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
boolean isDirectory = file.isDirectory(); | ||
if (file.delete()) { | ||
if (isDirectory) { | ||
logger.info(" deleted temporary directory: " + filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
deleteAndLog(temporaryDirectory); | ||
} | ||
|
||
private static void deleteAndLog(File file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know JUNit has easier ways to manage temp files these days. Does TestNG?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could use method FileUtils.deleteDirectory from commons-io but that module does not currently have that dependency and it's probably not worth it to add it just for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In JUnit you can just annotate a field with @TempFile
and the framework manages creating and disposing it. Again, I don't know if TestNG has that yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't found anything similar in TestNG
if (resourceUrl == null) { | ||
throw new RuntimeException("Cannot find resource path for table name: " + tableName); | ||
} | ||
logger.info("resource url: " + resourceUrl); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
private void executeCreationTestAndDropCycle(DistributedQueryRunner queryRunner, String tableName, String externalLocation, boolean shouldFail, @Language("RegExp") String errorMessage) | ||
{ | ||
logger.info("Executing Create - Test - Drop for: " + tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, local build of doc looks good! One very minor nit of a suggestion to consider.
@@ -195,6 +195,9 @@ Property Name Description | |||
S3SelectPushdown. | |||
|
|||
``hive.metastore.load-balancing-enabled`` Enable load balancing between multiple Metastore instances | |||
|
|||
``hive.skip-empty-files`` Enable skipping empty files. Otherwise, it will produce an ``false`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull updated branch, local docs build, looks good. Thanks!
Nit suggestion for the release note entry:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good. Could you look into adding a test when bucketed execution is enabled? I just want to make sure this continues to work when there's a definitive order required of the underlying files.
|
||
private Iterator<HiveFileInfo> remoteIterator = Collections.emptyIterator(); | ||
|
||
public HiveFileIterator( | ||
Path path, | ||
ListDirectoryOperation listDirectoryOperation, | ||
NamenodeStats namenodeStats, | ||
NestedDirectoryPolicy nestedDirectoryPolicy) | ||
NestedDirectoryPolicy nestedDirectoryPolicy, | ||
boolean skipEmptyFiles)// Tipo Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean skipEmptyFiles)// Tipo Boolean | |
boolean skipEmptyFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bucketed execution is enabled by default. You mean add a test when it is disabled?
@@ -195,6 +195,9 @@ Property Name Description | |||
S3SelectPushdown. | |||
|
|||
``hive.metastore.load-balancing-enabled`` Enable load balancing between multiple Metastore instances | |||
|
|||
``hive.skip-empty-files`` Enable skipping empty files. Otherwise, it will produce an ``false`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's a table.
* @param shouldFail {@code true} if the table creation should fail, {@code false} otherwise | ||
* @param errorMessage a {@link String} containing the expected error message. Will be checked if {@code shouldFail} is {@code true} | ||
*/ | ||
private void checkBucketedResult(DistributedQueryRunner queryRunner, String tableName, boolean shouldFail, @Language("RegExp") String errorMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean parameters to change behavior are an antipattern that increases cyclomatic complexity. Use two methods, one for things that should fail and one for things that should pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
throws Exception | ||
{ | ||
// obtains the root resource directory in order to create temporary tables | ||
URL url = TestHiveSkipEmptyFiles.class.getClassLoader().getResource("."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary files should be located with either the Java temp fiels API or TestNG APIs, not the classloader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
// obtains the root resource directory in order to create temporary tables | ||
URL url = TestHiveSkipEmptyFiles.class.getClassLoader().getResource("."); | ||
if (url == null) { | ||
throw new RuntimeException("Could not obtain resource URL"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This must be more specific. Never throw a raw RuntimeException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (!created) { | ||
throw new RuntimeException("Could not create resource directory: " + temporaryDirectory.getPath()); | ||
} | ||
File firstParquetFile = new File(temporaryDirectory, randomUUID().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're reinventing a lot of temp file logic that already exists in the JDK, and probably in TestNG too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
* @param temporaryDirectory a {@link File} pointing to the directory to delete | ||
* @param hasPattern a {@code true} if it is necessary to delete only one directory file, {@code false} otherwise | ||
*/ | ||
private static void deleteMetadata(File temporaryDirectory, boolean hasPattern) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've done far more work here than you needed, I'm afraid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
String tableName = "skip_empty_files_success"; | ||
File resourcesLocation = generateMetadata(tableName); | ||
executeCreationTestAndDropCycle(queryRunner, tableName, getResourceUrl(tableName), false, null); | ||
deleteMetadata(resourcesLocation, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletion in tests should be handled in an after test method to ensure cleanup in the face of exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Hi, I’ve implemented all the requested changes, but one check failed, and I’m not able to reproduce it in local. Do you know why could be caused or could you take a look when you have a chance? Thank you |
@cvarelad-denodo it might be a flaky test. I've kicked it off again to verify. If it is, we'll create an issue for that, and re-review--thanks! |
Hi @elharo. I’ve implemented all the requested changes. Could you take a look when you have a chance and let me know if it's now ready to merge? Thank you |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits
queryRunner.execute(insertQuery); | ||
} | ||
if (replaceDataFileByEmptyFile) { | ||
Files.delete(Arrays.stream(requireNonNull(partitionDirectory.toFile().listFiles((dir, name) -> !name.endsWith(".crc")))).iterator().next().toPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we break this up into multiple lines for readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private void checkBucketedResult(DistributedQueryRunner queryRunner, String tableName) | ||
{ | ||
try { | ||
@Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG, | ||
SCHEMA, tableName); | ||
MaterializedResult result = queryRunner.execute(selectQuery); | ||
assertEquals(5, result.getRowCount()); | ||
} | ||
finally { | ||
@Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG, | ||
SCHEMA, tableName); | ||
queryRunner.execute(dropQuery); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we refactor this method and the one below to just check the output? We can move the try/finally to the test cases themselves, and have a method in the finally clause that deletes the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, did you also consider querying the $files
hidden column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it follows the same flow as a normal query and it doesn't add anything new.
@Test | ||
public void testSkipEmptyFilesBucketSuccessful() | ||
{ | ||
String tableName = "skip_empty_files_bucket_success"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just inline this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -118,6 +119,11 @@ public class TestQuickStatsProvider | |||
USE_LIST_DIRECTORY_CACHE, | |||
"Directory list caching", | |||
false, | |||
false), | |||
booleanProperty( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a real execution HiveSessionProperties always includes value for that property because it gets it from HiveClientConfig in the constructor. Therefore, the code assumes that the property always exists.
In tests as you use a TestingConnectorSession, you have to add that property to work during the creation of the HiveDirectoryContext when calling buildQuickStats().
Ignore empty files when hive.skip_empty_files is set to true. Otherwise, it produces an error when iterating through these files.
1d23e92
to
7f6b572
Compare
7f6b572
to
cc72732
Compare
Hi, I've implemented all the requested changes but I'm not able to reproduce the failing check in local again. Could you take a look when you have a chance and let me know if it's now ready to merge? Thank you |
private DistributedQueryRunner queryFailRunner; | ||
private DistributedQueryRunner queryBucketRunner; | ||
private DistributedQueryRunner queryBucketFailRunner; | ||
File temporaryDirectory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
Ignore empty files when hive.skip_empty_files is set to true (false by default). Otherwise, it produces an error when iterating through these files.
Description
This support allows to configure a hive connector property to decide if Presto should ignore these empty files or it should fail (default behavior). The new connector property is called hive.skip_empty_files. The following diagram shows the flow of the property to the HiveFileIterator:
Motivation and Context
Currently, if Presto reads a Parquet folder where one of the files is an empty file the query fails. This is a problem in some environments.
Solves issue: 14759
Impact
No impact
Test Plan
TestHiveSkipEmptyFiles contains two tests. Both reads a directory that contains an empty file. One has the property set to true and the query execution finish with no errors skipping the empty file. The other one has the property set to false and the execution fails as before these changes
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.