From c59823674e8c2ad1075edad47284f4b7603e14cb Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 31 Oct 2024 15:24:22 +0000 Subject: [PATCH] Disable asynchronous deletes for Hudi (#94) In #82 we added a feature to delete files asynchronously. This has worked great for Delta, which is tolerant to deletes that happen eventually not immediately. But it is not working great for Hudi, which does delete-and-replace of the `hoodie.properties` file. This commit disables the feature again for Hudi only. --- .../processing/SparkUtils.scala | 10 ++++++---- .../tables/DeltaWriter.scala | 7 +++++++ .../tables/HudiWriter.scala | 9 +++++++++ .../tables/IcebergWriter.scala | 6 ++++++ .../tables/Writer.scala | 7 +++++++ 5 files changed, 35 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index bf271d7..094a3c9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -51,10 +51,12 @@ private[processing] object SparkUtils { Resource .make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close())) .evalTap { session => - Sync[F].delay { - // Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop - LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration) - } + if (writer.toleratesAsyncDelete) { + Sync[F].delay { + // Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop + LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration) + } + } else Sync[F].unit } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala index 497d679..ee4e0a4 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala @@ -104,4 +104,11 @@ class DeltaWriter(config: Config.Delta) extends Writer { } } + /** + * Delta tolerates async deletes; in other words when we delete a file, there is no strong + * requirement that the file must be deleted immediately. Delta uses unique file names and never + * re-writes a file that was previously deleted + */ + override def toleratesAsyncDelete: Boolean = true + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala index b760caa..62ea644 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala @@ -85,4 +85,13 @@ class HudiWriter(config: Config.Hudi) extends Writer { .options(config.hudiWriteOptions) .save(config.location.toString) } + + /** + * Hudi cannot tolerate async deletes. When Hudi deletes a file, the file MUST be deleted + * immediately. + * + * In particular, the `hoodie.properties` file gets deleted and re-created by Hudi, and those + * steps must happen in order. + */ + override def toleratesAsyncDelete: Boolean = false } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala index 582d91c..3f22d8b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala @@ -107,4 +107,10 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { } .mkString(", ") + /** + * Iceberg tolerates async deletes; in other words when we delete a file, there is no strong + * requirement that the file must be deleted immediately. Iceberg uses unique file names and never + * re-writes a file that was previously deleted + */ + override def toleratesAsyncDelete: Boolean = true } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala index eb3dceb..aa6e473 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala @@ -29,4 +29,11 @@ trait Writer { /** Write Snowplow events into the table */ def write[F[_]: Sync](df: DataFrame): F[Unit] + + /** + * Whether this lake format tolerates deletes to happen asynchronously instead of immediately + * + * If tolerated, then we use our customized `LakeLoaderFileSystem`. + */ + def toleratesAsyncDelete: Boolean }