-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't rewrite single small file per partition during Optimize #18938
Don't rewrite single small file per partition during Optimize #18938
Conversation
8349345
to
f8a05d9
Compare
f8a05d9
to
1c9aaa9
Compare
@@ -62,6 +63,7 @@ public enum WriteType | |||
|
|||
// OPTIMIZE only. Coordinator-only | |||
private final boolean recordScannedFiles; | |||
private final Optional<DeltaLakeTableExecuteHandle> executeHandle; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduce the dependency here.From reading the code, we need in DeltaLakeSplitManager
to know whether we're in an OPTIMIZE
situation from the DeltaLakeTableHandle
. If yes, let's just add a boolean
flag to point this out.
Do reflect this in equals()
and hashCode()
methods as well.
ImmutableList<String> partitionColumns = metadataEntry.getOriginalPartitionColumns().stream() | ||
.map(partitionColumnMapping::get).collect(toImmutableList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImmutableList<String> partitionColumns = metadataEntry.getOriginalPartitionColumns().stream() | |
.map(partitionColumnMapping::get).collect(toImmutableList()); | |
ImmutableList<String> partitionColumns = metadataEntry.getOriginalPartitionColumns().stream() | |
.map(partitionColumnMapping::get) | |
.collect(toImmutableList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gentle reminder.
@@ -198,6 +221,11 @@ private Stream<DeltaLakeSplit> getSplits( | |||
return Stream.empty(); | |||
} | |||
|
|||
// no need to rewrite small file that is the only one in its partition | |||
if (isOptimize && filesPerBucket.get(getPartitionKey(originalPartitionColumns, addAction)) <= 1 && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() < maxScannedFileSizeInBytes.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxScannedFileSizeInBytes.isPresent() && addAction.getSize() < maxScannedFileSizeInBytes.get()
If we have only one file in the partition, there is nothing to optimize, right? The above highlighted condition is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is bigger than maxScannedFileSizeInBytes then it should be optimzied and splitted into 2 files
return counters; | ||
} | ||
|
||
private String getPartitionKey(List<String> partitionColumns, AddFileEntry addFileEntry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this rather ad-hoc mechanism, wouldn't it be enough to make use of io.trino.plugin.deltalake.transactionlog.AddFileEntry#getCanonicalPartitionValues
as a key? I'm guessing that this map can be used for equality.
An alternative would be a list of deserialized partition values io.trino.plugin.deltalake.transactionlog.TransactionLogParser#deserializePartitionValue (probably not necessary though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was kind of afraid that using map as a key in another map may not be reliable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The map is just an object with a hashcode - i don't think it matters whether you use string or the canonicalPartitionValues
as key in the map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if it doesnt matter why do you insist? ;) i am kind of afraid of things like Set<Map<String, Optional<String>>
i can do it but im not a big fun
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm insisting because I don't see the point of having a special logic for geting the stringified "partition key".
@@ -236,6 +264,33 @@ private Stream<DeltaLakeSplit> getSplits( | |||
}); | |||
} | |||
|
|||
private Map<String, Long> countFilesPerPartition(List<String> partitionColumns, List<AddFileEntry> addFileEntries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC we want to keep (in case we're doing OPTIMIZE
) only the AddFileEntries
with duplicate canonical partition values per partition/table (in case the table is not partitioned).
We don't necessarily need to do counting in order to keep only the entries which have duplicate canonical partition values.
Here's a chatgpt proof of concept
public class Main {
public static void main(String[] args) {
// Create a list of elements (strings)
List<String> elements = Arrays.asList("apple", "banana", "cherry", "banana", "date", "banana", "apple", "fig");
// Create a set to store elements seen before
Set<String> seenElements = new HashSet<>();
// Create a set to store elements that occur more than once
Set<String> multipleOccurrenceElements = new HashSet<>();
// Iterate through the list to find elements that occur more than once
for (String element : elements) {
if (!seenElements.add(element)) {
// If the element was already in the set, it occurs more than once
multipleOccurrenceElements.add(element);
}
}
// Print elements that occur more than once
for (String element : elements) {
if (multipleOccurrenceElements.contains(element)) {
System.out.println(element);
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, great idea, thank you
1c9aaa9
to
8a577c1
Compare
maxScannedFileSize, | ||
readVersion); | ||
} | ||
|
||
public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize) | ||
public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize, boolean isOptimize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isOptimize
is redundant because the method name is forOptimize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you remove the method parameter?
@@ -198,6 +219,11 @@ private Stream<DeltaLakeSplit> getSplits( | |||
return Stream.empty(); | |||
} | |||
|
|||
// no need to rewrite small file that is the only one in its partition | |||
if (isOptimize && !partitionKeysWithMoreThanOneFile.contains(getPartitionKey(originalPartitionColumns, addAction)) && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() < maxScannedFileSizeInBytes.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (isOptimize && !partitionKeysWithMoreThanOneFile.contains(getPartitionKey(originalPartitionColumns, addAction)) && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() < maxScannedFileSizeInBytes.get()) { | |
if (isOptimize && partitionKeysWithSingleFile.contains(getPartitionKey(originalPartitionColumns, addAction)) && maxScannedFileSizeInBytes.isPresent() && addAction.getSize() < maxScannedFileSizeInBytes.get()) { |
The positive tense (instead of !
) is a bit easier to follow from maintainer perspective.
Scratch this in case it doesn't fit with the optimized logic for retrieving partitions with more than one file.
1812d35
to
d0ed6dd
Compare
d0ed6dd
to
a9f6fa0
Compare
ImmutableSet.Builder<Map<String, Optional<String>>> partitionsWithAtMostOneFileBuilder = ImmutableSet.builder(); | ||
if (isOptimize) { | ||
partitionsWithAtMostOneFileBuilder.addAll(findPartitionsWithAtMostOneFile(validDataFiles)); | ||
} | ||
Set<Map<String, Optional<String>>> partitionsWithAtMostOneFile = partitionsWithAtMostOneFileBuilder.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ImmutableSet.Builder<Map<String, Optional<String>>> partitionsWithAtMostOneFileBuilder = ImmutableSet.builder(); | |
if (isOptimize) { | |
partitionsWithAtMostOneFileBuilder.addAll(findPartitionsWithAtMostOneFile(validDataFiles)); | |
} | |
Set<Map<String, Optional<String>>> partitionsWithAtMostOneFile = partitionsWithAtMostOneFileBuilder.build(); | |
Set<Map<String, Optional<String>>> partitionsWithOneFileForOptimize = isOptimize ? findPartitionsWithAtMostOneFile(validDataFiles): Set.of(); |
ddcb629
to
c65e54e
Compare
/test-with-secrets sha=c65e54edd7972f08a28bd9d46a525cfe58b7895a |
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
Outdated
Show resolved
Hide resolved
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/6234333370 |
c65e54e
to
92a3941
Compare
Description
Fixes: #18645
Additional context and related issues
Release notes
(x) This is not user-visible or is docs only, and no release notes are required.