Skip to content

Commit

Permalink
Disable asynchronous deletes for Hudi (#94)
Browse files Browse the repository at this point in the history
In #82 we added a feature to delete files asynchronously. This has
worked great for Delta, which is tolerant to deletes that happen
eventually not immediately. But it is not working great for Hudi, which
does delete-and-replace of the `hoodie.properties` file.

This commit disables the feature again for Hudi only.
  • Loading branch information
istreeter authored Oct 31, 2024
1 parent 3952d13 commit c598236
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ private[processing] object SparkUtils {
Resource
.make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close()))
.evalTap { session =>
Sync[F].delay {
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
}
if (writer.toleratesAsyncDelete) {
Sync[F].delay {
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
}
} else Sync[F].unit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,11 @@ class DeltaWriter(config: Config.Delta) extends Writer {
}
}

/**
* Delta tolerates async deletes; in other words when we delete a file, there is no strong
* requirement that the file must be deleted immediately. Delta uses unique file names and never
* re-writes a file that was previously deleted
*/
override def toleratesAsyncDelete: Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@ class HudiWriter(config: Config.Hudi) extends Writer {
.options(config.hudiWriteOptions)
.save(config.location.toString)
}

/**
* Hudi cannot tolerate async deletes. When Hudi deletes a file, the file MUST be deleted
* immediately.
*
* In particular, the `hoodie.properties` file gets deleted and re-created by Hudi, and those
* steps must happen in order.
*/
override def toleratesAsyncDelete: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,10 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
}
.mkString(", ")

/**
* Iceberg tolerates async deletes; in other words when we delete a file, there is no strong
* requirement that the file must be deleted immediately. Iceberg uses unique file names and never
* re-writes a file that was previously deleted
*/
override def toleratesAsyncDelete: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ trait Writer {

/** Write Snowplow events into the table */
def write[F[_]: Sync](df: DataFrame): F[Unit]

/**
* Whether this lake format tolerates deletes to happen asynchronously instead of immediately
*
* If tolerated, then we use our customized `LakeLoaderFileSystem`.
*/
def toleratesAsyncDelete: Boolean
}

0 comments on commit c598236

Please sign in to comment.