diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index bf271d7..094a3c9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -51,10 +51,12 @@ private[processing] object SparkUtils { Resource .make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close())) .evalTap { session => - Sync[F].delay { - // Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop - LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration) - } + if (writer.toleratesAsyncDelete) { + Sync[F].delay { + // Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop + LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration) + } + } else Sync[F].unit } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala index 497d679..ee4e0a4 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala @@ -104,4 +104,11 @@ class DeltaWriter(config: Config.Delta) extends Writer { } } + /** + * Delta tolerates async deletes; in other words when we delete a file, there is no strong + * requirement that the file must be deleted immediately. Delta uses unique file names and never + * re-writes a file that was previously deleted + */ + override def toleratesAsyncDelete: Boolean = true + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala index b760caa..62ea644 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala @@ -85,4 +85,13 @@ class HudiWriter(config: Config.Hudi) extends Writer { .options(config.hudiWriteOptions) .save(config.location.toString) } + + /** + * Hudi cannot tolerate async deletes. When Hudi deletes a file, the file MUST be deleted + * immediately. + * + * In particular, the `hoodie.properties` file gets deleted and re-created by Hudi, and those + * steps must happen in order. + */ + override def toleratesAsyncDelete: Boolean = false } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala index 582d91c..3f22d8b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala @@ -107,4 +107,10 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { } .mkString(", ") + /** + * Iceberg tolerates async deletes; in other words when we delete a file, there is no strong + * requirement that the file must be deleted immediately. Iceberg uses unique file names and never + * re-writes a file that was previously deleted + */ + override def toleratesAsyncDelete: Boolean = true } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala index eb3dceb..aa6e473 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala @@ -29,4 +29,11 @@ trait Writer { /** Write Snowplow events into the table */ def write[F[_]: Sync](df: DataFrame): F[Unit] + + /** + * Whether this lake format tolerates deletes to happen asynchronously instead of immediately + * + * If tolerated, then we use our customized `LakeLoaderFileSystem`. + */ + def toleratesAsyncDelete: Boolean }