-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Auto Compaction #1156
base: master
Are you sure you want to change the base?
Support Auto Compaction #1156
Conversation
I didn't write a design doc & issue since it's straightforward. |
Hi @sezruby - thanks for this PR! It will take some time for us to review and verify it. We will get back to you. |
Hi @sezruby - just updating you with the status on our end. We are very busy with planned features for the next release of Delta Lake, as well with preparation for the upcoming Data and AI summit in June. So, it will take us some time to get back to you on this. |
32ce3ab
to
4434db7
Compare
63ef599
to
e347d28
Compare
@vkorukanti Could you review the PR when you have the time? TIA! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just very tiny nits 😎
core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Outdated
Show resolved
Hide resolved
@@ -160,23 +160,80 @@ class OptimizeExecutor( | |||
|
|||
private val isMultiDimClustering = zOrderByColumns.nonEmpty | |||
|
|||
def optimize(): Seq[Row] = { | |||
def optimize(isAutoCompact: Boolean = false, targetFiles: Seq[AddFile] = Nil): Seq[Row] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isAutoCompact
caught my attention (after auto
above in Optimize
) but don't really know how to make these two names the same. I feel they should really be the same but no idea how. Sorry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That Optimize
is for deltaLog. The current name looks fine to me but let me know a better one if any.
val maxFileSize = sparkSession.sessionState.conf.getConf( | ||
DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) | ||
val maxFileSize = if (isAutoCompact) { | ||
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be handy to have a table property too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add it with another PR. In Databricks, there's targetFileSize property https://docs.databricks.com/delta/optimizations/file-mgmt.html#set-a-target-size
core/src/main/scala/org/apache/spark/sql/delta/hooks/DoAutoCompaction.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/hooks/DoAutoCompaction.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
Outdated
Show resolved
Hide resolved
7479f46
to
ab18631
Compare
1 | ||
} else { | ||
// compaction | ||
2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with a couple of val
s to give some meaning to these numbers first, e.g.
val MULTI_DIM_CLUSTERING=1
val COMPACTION_MORE_FILES = 2
(not sure these names are correct but that's the idea)
core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala
Outdated
Show resolved
Hide resolved
@vkorukanti @scottsand-db A gentle reminder. This one is simpler than Optimize Write so I would like to merge this PR first. |
require(maxFileSize > 0, "maxFileSize must be > 0") | ||
|
||
val candidateFiles = txn.filterFiles(partitionPredicate) | ||
val minNumFilesInDir = optimizeType.minNumFiles | ||
val (candidateFiles, filesToProcess) = optimizeType.targetFiles |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
candidateFiles
for statistics. not sure it's still required for debugging
) | ||
.stringConf | ||
.transform(_.toLowerCase(Locale.ROOT)) | ||
.createWithDefault("table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I would prefer to set "partition" as default though I set "table" because it's databricks default
I don't expect any meaningful gain from "table"
Can you please fix the conflicts? |
d7a00bb
to
0019c22
Compare
@scottsand-db @zsxwing Could you review the PR? |
We are also having this issue, we can't define disjoint conditions from both merge and optimize if they are done concurrently. |
@pedrosalgadowork which issue do you mean by? is it related to auto compaction? |
@scottsand-db @zsxwing @tdas Could you review the PR? |
@scottsand-db @zsxwing @tdas - can you help review this PR? Its been open for several months now with no updates/comments recently. |
Would be great to have this on Delta 2.3. Is it the plan to merge it soon? |
Looks like there's some conflicts with the new DV stuff, had to update some things rebasing things on the 2.3 release in my fork. Would be great to get some more looks at this and get this merged in, this is a highly valuable and missing feature. |
Signed-off-by: Eunjin Song <[email protected]> Co-authored-by: Sandip Raiyani <[email protected]>
@dennyglee @scottsand-db @zsxwing @tdas Could you review the PR? |
@dennyglee @scottsand-db @zsxwing @tdas Could you review the PR? I'll resolve the conflict once you started actively reviewing. |
@dennyglee @scottsand-db @zsxwing @tdas @allisonport-db Could you review the PR? |
Is there any obstacle to the review of this PR? |
@sezruby In Class spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala method "groupFilesIntoBins" why are we using individual If total files size are greater than autoCompact.maxFileSize and total number of files are > MinNumFiles, but after segregating it in bins by size the individual bins will always have lesser files than MinNumFiles and hence it will not auto-compact the files. Any particular reason for doing that ? i understand it might cause compaction of some small file but isn't it better than no compaction ? |
I think autoCompact should be available now? since it's documented here |
Yes, since Delta 3.1. This PR can be closed now. |
Thanks! Do you know whether autoCompact would work when using with spark structured streaming with delta table as the sink? |
Description
Support Auto Compaction described in:
https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-auto-compaction-works
We can support Auto compaction via a new post commit hook and OptimizeCommand with less size threshold.
spark.databricks.delta.autoCompact.enabled
(default: false)spark.databricks.delta.autoCompact.maxFileSize
(default: 128MB)spark.databricks.delta.autoCompact.minNumFiles
(default: 50)The configs above are same as Databricks Auto compaction.
New config1 - autoCompact.maxCompactBytes
As it will be triggered after every table update, I introduced another config to control the total amount of data to be optimized for an auto compaction operation:
spark.databricks.delta.autoCompact.maxCompactBytes
(default: 20GB)In Databricks, it's adjusted based on available cluster resources. The config is a quick and easy workaround for it.
New config2 - autoCompact.target
The PR adds another new config -
autoCompact.target
to change target files for auto compaction.spark.databricks.delta.autoCompact.target
(default: "partition")table
: target all files in the tablecommit
: target only added/updated files of the commit which is triggering auto compaction.partition
: target only the partitions containing any of added/updated files of the commit which is triggering auto compaction.Users are usually writing/updating data only for few partitions, and don't expect changes in other partitions.
In case the table is not optimized, the default behavior
table
might cause some conflicts between other partitions unexpectedly and added/updated files in the triggering commit might not be optimized if there are many small files in other partitions.Fixes #815
How was this patch tested?
Unit tests
Does this PR introduce any user-facing changes?
Support Auto compaction feature