-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] add auto-compact post commit hook #2414
Conversation
* 2. Then we check if the deprecated property `DeltaConfigs.AUTO_OPTIMIZE` is set. If yes, then | ||
* we return [[AutoCompactType.Legacy]] type. | ||
* 3. Then we check the table property [[DeltaConfigs.AUTO_COMPACT]]. | ||
* 4. If none of 1/2/3 are set explicitly, then we return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It never returns None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually do. If you look here in the apply
method for AutoCompactType
, when we pass it Disabled
it returns None
* 3. Then we check the table property [[DeltaConfigs.AUTO_COMPACT]]. | ||
* 4. If none of 1/2/3 are set explicitly, then we return None | ||
*/ | ||
def getAutoCompactType(conf: SQLConf, metadata: Metadata): Option[AutoCompactType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option[AutoCompactType] -> AutoCompactType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment about returning None
* 1. The highest priority is given to [[DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED]] config. | ||
* 2. Then we check if the deprecated property `DeltaConfigs.AUTO_OPTIMIZE` is set. If yes, then | ||
* we return [[AutoCompactType.Legacy]] type. | ||
* 3. Then we check the table property [[DeltaConfigs.AUTO_COMPACT]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new flag shouldn't have priority?
* hook to compact the files. | ||
* It can be enabled by setting the property to `true` | ||
* Note that the behavior from table property can be overridden by the config: | ||
* [[org.apache.spark.sql.delta.sources.DeltaSQLConfEdge.DELTA_AUTO_COMPACT_ENABLED]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.spark.sql.delta.sources.DeltaSQLConfEdge
I guess this is from your private code
* Prioritization: | ||
* 1. The highest priority is given to [[DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED]] config. | ||
* 2. Then we check if the deprecated property `DeltaConfigs.AUTO_OPTIMIZE` is set. If yes, then | ||
* we return [[AutoCompactType.Legacy]] type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also prefer Legacy
, but it is currently named Enabled
* 2. MIN_FILE_SIZE is configurable and defaults to MAX_FILE_SIZE / 2 unless overridden. | ||
* Note: User can use DELTA_AUTO_COMPACT_MAX_FILE_SIZE to override this value. | ||
*/ | ||
case object Enabled extends AutoCompactType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have two distinct objects? Enabled and Legacy to make it clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legacy is a, well, legacy name, there are only Enabled
, or Disabled
* 2. MIN_FILE_SIZE is configurable and defaults to MAX_FILE_SIZE / 2 unless overridden. | ||
* Note: User can use DELTA_AUTO_COMPACT_MAX_FILE_SIZE to override this value. | ||
*/ | ||
val defaultMaxFileSize: Long = 16 * 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any usage of defaultMaxFileSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch thanks! A victim of some code shuffling. Fixed to not need this anymore.
@felipepessoto thanks for the review! Yes, this is based on the implementation used by Databricks and so has been battle tested in many production workloads. We absolutely considered #1156, and are grateful for the contribution. However, due to our extensive experience with this implementation we felt this was the best path forward. (I'm OOO until early Jan, but will reply as possible to further comments) |
@@ -471,6 +471,8 @@ object DeltaOperations { | |||
|
|||
sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression]) | |||
extends OperationWithPredicates(name, predicates) | |||
/** parameter key to indicate whether it's an Auto Compaction */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to indicate Auto Compaction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re #2414 (comment), perhaps we can address post merge
@@ -482,10 +484,12 @@ object DeltaOperations { | |||
/** Recorded when optimizing the table. */ | |||
case class Optimize( | |||
predicate: Seq[Expression], | |||
zOrderBy: Seq[String] = Seq.empty | |||
zOrderBy: Seq[String] = Seq.empty, | |||
auto: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think autoCompact
would be more meaningful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re #2414 (comment), perhaps we can address post merge
@@ -350,6 +350,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite | |||
checkDeletionVectorFilesHaveWideBounds = false | |||
} | |||
|
|||
/** The set of distinct partitions that contain added files by current transaction. */ | |||
protected[delta] var partitionsAddedToOpt: Option[mutable.HashSet[Map[String, String]]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove Opt
suffix (it's Scala - a type-heavy language after all 😉 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use this fairly consistently:
- https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala#L62
- https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala#L63
- https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala#L63
We could propose something here, more generally, but for now I'll leave it as is
@@ -860,6 +865,46 @@ trait OptimisticTransactionImpl extends TransactionalWrite | |||
} | |||
} | |||
|
|||
def reportAutoCompactStatsError(e: Throwable): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha! That proves my point about calling the param autoCompact
😉
if (numAdd == 0 && numRemove == 0) return | ||
val collector = createAutoCompactStatsCollector() | ||
if (collector.isInstanceOf[DisabledAutoCompactPartitionStatsCollector]) return | ||
AutoCompactPartitionStats.instance(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is instance
needed?! Why not use apply
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* A subclass of AutoCompactPartitionStatsCollector that's to be used if the config to collect | ||
* auto compaction stats is turned off. This subclass intentionally does nothing. | ||
*/ | ||
class DisabledAutoCompactPartitionStatsCollector extends AutoCompactPartitionStatsCollector{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A whitespace before closing {
@@ -0,0 +1,376 @@ | |||
/* | |||
* Copyright (2021) The Delta Lake Project Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024?
class AutoCompactPartitionStats( | ||
private var maxNumTablePartitions: Int, | ||
private var maxNumPartitions: Int | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why on a separate line while the other class PartitionStat
has them to close the input arguments?! Be consistent 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re #2414 (comment), perhaps we can address post merge
* @param minNumFiles The minimum number of files this table-partition should have to trigger | ||
* Auto Compaction in case it has already been compacted once. | ||
*/ | ||
def hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles: Long): Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...OrNotCompactedYet
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re #2414 (comment), perhaps we can address post merge
private var _instance: AutoCompactPartitionStats = null | ||
|
||
/** The thread safe constructor of singleton. */ | ||
def instance(spark: SparkSession): AutoCompactPartitionStats = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why synchronized? Why is there a need for a singleton?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple threads could be committing simultaneously, but we want to keep all the stats together. This gives us a safe way to have a single source of stats across all transactions
@@ -0,0 +1,332 @@ | |||
/* | |||
* Copyright (2021) The Delta Lake Project Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah... this is a global problem for all files. let fix this in a separate PR for all files.
This is not in the roadmap. Is it planned to 3.1 release? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after addressing all pending review comments.
.booleanConf | ||
.createWithDefault(true) | ||
|
||
val DELTA_AUTO_COMPACT_NON_BLIND_APPEND_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test with this property enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have a follow-up for some other testing, which we will do post merge
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val DELTA_AUTO_COMPACT_MAX_NUM_MODIFIED_PARTITIONS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same test comment here.
352e02b
to
e0184d0
Compare
This reverts commit 398547a.
Overall, this PR looks pretty good to me other than small naming improvements that others have pointed out (@felipepessoto @jaceklaskowski thank you so so much for your reviews). We are nearing the target 3.1 release date of Jan 24, and I would like to cut a release soon with this feature. Unless there is major objections to the core logic of the PR, can we merge this? |
I only have one concern, my comment:
It that right? Is it the same behavior of Databricks? So that could be a reason to keep compatibility. I don't see any previous usage of that flag, for example for optimized write we ignored the old flag and don't use it at all. |
And a nit, the AUTO_COMPACT could be placed close to the other autoOptimize settings |
Yeah, this is to maintain compatibility with Databricks, so I think we should keep it as is |
Also makes sense, but is trickier than you might expect. We can absolutely look at cleaning this up post merge if possible |
.internal() | ||
.doc(s"Target file size produced by auto compaction. The default value of this config" + | ||
" is 128 MB.") | ||
.longConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use bytesConf to be consistent with the optimized write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say to do it after 3.1 cut, but before final release, if possible
Which Delta project/connector is this regarding?
Description
Adds code to support Auto Compaction.
Auto compaction combines small files within partitions to reduce problems due to a proliferation of small files. Auto compaction is implemented as a post commit hook, and so occurs after the write to a table has succeeded. It runs synchronously on the cluster that has performed the write.
You can control the output file size by setting the spark.databricks.delta.autoCompact.maxFileSize.
Auto compaction is only triggered for partitions or tables that have at least a certain number of small files. You can optionally change the minimum number of files required to trigger auto compaction by setting spark.databricks.delta.autoCompact.minNumFiles.
This PR creates a post commit hook, which runs an
OptimizeExecutor
(fromOptimizeTableCommand.scala
), which will do the compaction.Details
We add a post-commit hook in TransactionalWrite, that will check if auto-compaction is needed. If the configs are set such that the write meets the criteria (i.e. AC is enabled, enough small files exist, etc) then partitions that meet the criteria will be reserved, and used to make an OptimizeExecutor targeting those partitions, and with the appropriate config values.
This runs and will compact the files. Partitions are then released for future compactions to consider.
AutoCompact is disabled by default
Configs
There are a number of new configs introduced by this PR, all with prefix spark.databricks.delta.autoCompact. Through a lot of experimentation and user feedback, we found these values to work well across a large range of tables and configurations.
autoCompact.enabled
: should auto compaction run? (default: false)autoCompact.maxFileSize
: Target file size produced by auto compaction (default: 128 MB)autoCompact.minFileSize
: Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files by the Auto Compaction. (default: half of maxFileSize)How was this patch tested?
Unit tests in AutoCompactionSuite
Does this PR introduce any user-facing changes?
Yes, please see the Description