diff --git a/build.sbt b/build.sbt index ebfe5cf3..9fc990a3 100755 --- a/build.sbt +++ b/build.sbt @@ -76,7 +76,7 @@ lazy val awsHudi: Project = project .withId("awsHudi") .settings(BuildSettings.awsSettings ++ BuildSettings.hudiAppSettings) .settings(target := (hudi / target).value / "aws") - .settings(libraryDependencies ++= Dependencies.awsDependencies) + .settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.hudiAwsDependencies) .dependsOn(core) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) .dependsOn(hudi % "runtime->runtime") diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 3531756d..39ec961e 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -37,6 +37,12 @@ # -- The number of batches of events which are pre-fetched from kinesis. # -- Increasing this above 1 is not known to improve performance. "bufferSize": 3 + + # -- Name of this KCL worker used in the dynamodb lease table + "workerIdentifier": ${HOSTNAME} + + # -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. + "leaseDuration": "10 seconds" } "output": { @@ -47,15 +53,13 @@ # -- For a S3 bucket, the uri should start with `s3a://` "location": "s3a://my-bucket/events - # -- Atomic columns which should be brought to the "left-hand-side" of the events table, to - # -- enable Delta's Data Skipping feature. - # -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list - "dataSkippingColumns": [ - "load_tstamp" - "collector_tstamp" - "derived_tstamp" - "dvce_created_tstamp" - ] + # -- Any valid Delta table property + # -- This can be blank in most setups because the loader already sets sensible defaults. + "deltaTableProperties": { + "delta.dataSkippingStatsColumns": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" + "delta.checkpointInterval": "50" + } + } ## -- HUDI OUTPUT FORMAT -- ## @@ -74,8 +78,8 @@ # "hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" # } # -# # -- Any valid hudi table option -# "hudiWriteOptions": { +# # -- Any valid hudi table property +# "hudiTableProperties": { # "hoodie.keygen.timebased.output.dateformat": "yyyy-MM-dd" # } # } @@ -110,9 +114,14 @@ # "glue.id": "123456789" # } # } +# +# # -- Any valid Iceberg table property +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergTableProperties": { +# "write.metadata.metrics.column.event_id": "count" +# } # } - "bad": { # -- output kinesis stream for emitting failed events that could not be processed "streamName": "bad" diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 3e38cfe0..35c3511c 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -31,15 +31,13 @@ # -- For Azure blob storage, the uri should start with `abfs://` "location": "abfs://snowplow@example.dfs.core.windows.net/events" - # -- Atomic columns which should be brought to the "left-hand-side" of the events table, to - # -- enable Delta's Data Skipping feature. - # -- Default is the following important Snowplow timestamps: - "dataSkippingColumns": [ - "load_tstamp" - "collector_tstamp" - "derived_tstamp" - "dvce_created_tstamp" - ] + # -- Any valid Delta table property + # -- This can be blank in most setups because the loader already sets sensible defaults. + "deltaTableProperties": { + "delta.dataSkippingStatsColumns": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" + "delta.checkpointInterval": "50" + } + } ## -- HUDI OUTPUT FORMAT -- ## @@ -58,27 +56,10 @@ # "hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" # } # -# # -- Any valid hudi table option -# "hudiWriteOptions": { +# # -- Any valid hudi table property +# "hudiTableProperties": { # "hoodie.keygen.timebased.output.dateformat": "yyyy-MM-dd" # } -# } - - ## -- ICEBERG OUTPUT FORMAT, HADOOP CATALOG -- ## -# "good": { -# -# # -- Tell the loader to use Iceberg + Hadoop -# "type": "IcebergHadoop" -# -# # -- URI of the bucket where the data lake will be written (required) -# # -- For Azure blob storage, the uri should start with `abfs://` -# "location": "abfs://snowplow@example.dfs.core.windows.net/events" -# -# # -- Name of the database in the hadoop catalog (required) -# "database": "snowplow" -# -# # -- Name of the table in the hadoop catalog (required) -# "table": "events" # } ## -- ICEBERG OUTPUT FORMAT -- ## @@ -107,6 +88,12 @@ # "cache-enabled": "true" # } # } +# +# # -- Any valid Iceberg table property +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergTableProperties": { +# "write.metadata.metrics.column.event_id": "count" +# } # } "bad": { diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index 7a1c9c58..c897283a 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -10,8 +10,9 @@ # -- Pubsub subscription for the source of enriched events (required) "subscription": "projects/myproject/subscriptions/snowplow-enriched" - # -- How many threads are used internally by the pubsub client library for fetching events - "parallelPullCount": 3 + # -- Controls how many threads are used internally by the pubsub client library for fetching events. + # -- The number of threads is equal to this factor multiplied by the number of availble cpu cores + "parallelPullFactor": 0.5 # -- How many bytes can be buffered by the loader app before blocking the pubsub client library # -- from fetching more events. @@ -32,15 +33,13 @@ # -- For a GCS bucket, the uri should start with `gs://` "location": "gs://my-bucket/events - # -- Atomic columns which should be brought to the "left-hand-side" of the events table, to - # -- enable Delta's Data Skipping feature. - # -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list - "dataSkippingColumns": [ - "load_tstamp" - "collector_tstamp" - "derived_tstamp" - "dvce_created_tstamp" - ] + # -- Any valid Delta table property + # -- This can be blank in most setups because the loader already sets sensible defaults. + "deltaTableProperties": { + "delta.dataSkippingStatsColumns": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" + "delta.checkpointInterval": "50" + } + } ## -- HUDI OUTPUT FORMAT -- ## @@ -59,8 +58,8 @@ # "hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" # } # -# # -- Any valid hudi table option -# "hudiWriteOptions": { +# # -- Any valid hudi table property +# "hudiTableProperties": { # "hoodie.keygen.timebased.output.dateformat": "yyyy-MM-dd" # } # } @@ -104,6 +103,12 @@ # "cache-enabled": "true" # } # } +# +# # -- Any valid Iceberg table property +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergTableProperties": { +# "write.metadata.metrics.column.event_id": "count" +# } # } "bad": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index f8f3b2e5..e38190cf 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -23,29 +23,80 @@ "good": { "type": "Delta" "type": ${?packaging.output.good.type} - "dataSkippingColumns": [ - "load_tstamp" - "collector_tstamp" - "derived_tstamp" - "dvce_created_tstamp" - ] - - "hudiTableOptions": { + + "deltaTableProperties": { + "delta.logRetentionDuration": "interval 1 days" + "delta.dataSkippingStatsColumns": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp,true_tstamp" + "delta.checkpointInterval": "50" + } + + "icebergTableProperties": { + "write.spark.accept-any-schema": "true" + "write.object-storage.enabled": "true" + "write.metadata.metrics.max-inferred-column-defaults": "0" + "write.metadata.metrics.column.load_tstamp": "full" + "write.metadata.metrics.column.collector_tstamp": "full" + "write.metadata.metrics.column.derived_tstamp": "full" + "write.metadata.metrics.column.dvce_created_tstamp": "full" + "write.metadata.metrics.column.true_tstamp": "full" + } + + "hudiTableProperties": { "hoodie.table.name": "events" "hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator" "hoodie.table.partition.fields": "load_tstamp" + "hoodie.table.precombine.field": "load_tstamp" "hoodie.keygen.timebased.timestamp.scalar.time.unit": "microseconds" "hoodie.keygen.timebased.output.dateformat": "yyyy-MM-dd" } "hudiWriteOptions": { + # -- This loader works most efficiently with BULK_INSERT instead of INSERT + "hoodie.datasource.write.operation": "BULK_INSERT" + + # -- Record key and partition settings. Chosen to be consistent with `hudiTableOptions`. "hoodie.keygen.timebased.timestamp.type": "SCALAR" - "hoodie.datasource.write.operation": "INSERT" "hoodie.datasource.write.reconcile.schema": "true" "hoodie.datasource.write.partitionpath.field": "load_tstamp" "hoodie.schema.on.read.enable": "true" - "hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp" - // "hoodie.embed.timeline.server.reuse.enabled": "true" // TODO: Experiment with this. + "hoodie.metadata.index.column.stats.column.list": "load_tstamp,collector_tstamp,derived_tstamp,dvce_created_tstamp,true_tstamp" + "hoodie.metadata.index.column.stats.enable": "true" + "hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled": "true" + + # -- Configures how Hudi manages the timeline + "hoodie.embed.timeline.server": "true" + "hoodie.embed.timeline.server.reuse.enabled": "true" + "hoodie.filesystem.view.incr.timeline.sync.enable": "true" + "hoodie.filesystem.view.type": "SPILLABLE_DISK" + + # -- Hive sync is disabled by default. But if someone does enable it then these are helpful settings: + "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor" + "hoodie.datasource.hive_sync.partition_fields": "load_tstamp_date" + "hoodie.datasource.hive_sync.support_timestamp": "true" + + # -- Clustering: Every 4 commits, rewrite the latest parquet files to boost their size. + "hoodie.clustering.inline": "true" + "hoodie.clustering.plan.partition.filter.mode": "RECENT_DAYS" + "hoodie.clustering.plan.strategy.daybased.lookback.partitions": "1" + "hoodie.clustering.plan.strategy.target.file.max.bytes": "500000000" # 500 MB + "hoodie.clustering.plan.strategy.small.file.limit": "100000000" # 100 MB + "hoodie.clustering.inline.max.commits": "4" # Should be smaller than the ratio `target.file.max.bytes` / `small.file.limit` + "hoodie.clustering.plan.strategy.single.group.clustering.enabled": "false" + + # -- Parallelism. This loader works best when we prevent Hudi from ever running >1 task at a time + "hoodie.file.listing.parallelism": "1" + "hoodie.metadata.index.bloom.filter.parallelism": "1" + "hoodie.metadata.index.column.stats.parallelism": "1" + "hoodie.clustering.max.parallelism": "1" + "hoodie.finalize.write.parallelism": "1" + "hoodie.markers.delete.parallelism": "1" + "hoodie.rollback.parallelism": "1" + "hoodie.upsert.shuffle.parallelism": "1" + "hoodie.bloom.index.parallelism": "1" + "hoodie.insert.shuffle.parallelism": "1" + "hoodie.archive.delete.parallelism": "1" + "hoodie.cleaner.parallelism": "1" + "hoodie.clustering.plan.strategy.max.num.groups": "1" } "catalog": { @@ -69,6 +120,7 @@ "spark.sql.parquet.outputTimestampType": "TIMESTAMP_MICROS" "spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED" "spark.memory.storageFraction": "0" + "spark.databricks.delta.autoCompact.enabled": "false" } "gcpUserAgent": ${gcpUserAgent} "writerParallelismFraction": 0.5 diff --git a/modules/core/src/main/resources/simplelogger.properties b/modules/core/src/main/resources/simplelogger.properties index d9938183..ba7ec888 100644 --- a/modules/core/src/main/resources/simplelogger.properties +++ b/modules/core/src/main/resources/simplelogger.properties @@ -2,3 +2,4 @@ org.slf4j.simpleLogger.defaultLogLevel=warn org.slf4j.simpleLogger.log.com.snowplowanalytics=info org.slf4j.simpleLogger.log.org.apache.spark.scheduler.TaskSetManager=error org.slf4j.simpleLogger.log.org.apache.spark.sql.delta=info +org.slf4j.simpleLogger.log.org.apache.iceberg.metrics.LoggingMetricsReporter=info diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index d6e12a57..a2b4fd81 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -44,26 +44,31 @@ object Config { case class WithIglu[+Source, +Sink](main: Config[Source, Sink], iglu: ResolverConfig) - case class Output[+Sink](good: Target, bad: Sink) + case class Output[+Sink](good: Target, bad: SinkWithMaxSize[Sink]) + + case class SinkWithMaxSize[+Sink](sink: Sink, maxRecordSize: Int) + + case class MaxRecordSize(maxRecordSize: Int) sealed trait Target case class Delta( location: URI, - dataSkippingColumns: List[String] + deltaTableProperties: Map[String, String] ) extends Target case class Hudi( location: URI, hudiWriteOptions: Map[String, String], - hudiTableOptions: Map[String, String] + hudiTableProperties: Map[String, String] ) extends Target case class Iceberg( database: String, table: String, catalog: IcebergCatalog, - location: URI + location: URI, + icebergTableProperties: Map[String, String] ) extends Target sealed trait IcebergCatalog @@ -113,7 +118,11 @@ object Config { ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { - implicit val configuration = Configuration.default.withDiscriminator("type") + implicit val configuration = Configuration.default.withDiscriminator("type") + implicit val sinkWithMaxSize = for { + sink <- Decoder[Sink] + maxSize <- deriveConfiguredDecoder[MaxRecordSize] + } yield SinkWithMaxSize(sink, maxSize.maxRecordSize) implicit val icebergCatalog = deriveConfiguredDecoder[IcebergCatalog] implicit val target = deriveConfiguredDecoder[Target] implicit val output = deriveConfiguredDecoder[Output[Sink]] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index ba4c2ea8..b97024f2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -51,6 +51,7 @@ case class Environment[F[_]]( cpuParallelism: Int, inMemBatchBytes: Long, windowing: EventProcessingConfig.TimedWindows, + badRowMaxSize: Int, schemasToSkip: List[SchemaCriterion] ) @@ -66,7 +67,7 @@ object Environment { _ <- enableSentry[F](appInfo, config.main.monitoring.sentry) resolver <- mkResolver[F](config.iglu) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource - badSink <- toSink(config.main.output.bad) + badSink <- toSink(config.main.output.bad.sink) windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows)) (lakeWriter, lakeWriterHealth) <- LakeWriter.build[F](config.main.spark, config.main.output.good) sourceAndAck <- Resource.eval(toSource(config.main.input)) @@ -85,6 +86,7 @@ object Environment { cpuParallelism = cpuParallelism, inMemBatchBytes = config.main.inMemBatchBytes, windowing = windowing, + badRowMaxSize = config.main.output.bad.maxRecordSize, schemasToSkip = config.main.skipSchemas ) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala index 776471c3..9accace1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala @@ -11,12 +11,15 @@ package com.snowplowanalytics.snowplow.lakes import cats.implicits._ -import cats.effect.{Async, ExitCode, Resource, Sync} +import cats.effect.implicits._ +import cats.effect.{Async, Deferred, ExitCode, Resource, Sync} +import cats.effect.std.Dispatcher import cats.data.EitherT import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import io.circe.Decoder import com.monovore.decline.Opts +import sun.misc.{Signal, SignalHandler} import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sinks.Sink @@ -36,10 +39,14 @@ object Run { ): Opts[F[ExitCode]] = { val configPathOpt = Opts.option[Path]("config", help = "path to config file") val igluPathOpt = Opts.option[Path]("iglu-config", help = "path to iglu resolver config file") - (configPathOpt, igluPathOpt).mapN(fromConfigPaths(appInfo, toSource, toBadSink, _, _)) + (configPathOpt, igluPathOpt).mapN { case (configPath, igluPath) => + fromConfigPaths(appInfo, toSource, toBadSink, configPath, igluPath) + .race(waitForSignal) + .map(_.merge) + } } - private def fromConfigPaths[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder]( + def fromConfigPaths[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder]( appInfo: AppInfo, toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]], @@ -81,4 +88,32 @@ object Run { .as(ExitCode.Success) } + /** + * Trap the SIGTERM and begin graceful shutdown + * + * This is needed to prevent 3rd party libraries from starting their own shutdown before we are + * ready + */ + private def waitForSignal[F[_]: Async]: F[ExitCode] = + Dispatcher.sequential(await = true).use { dispatcher => + for { + deferred <- Deferred[F, Int] + _ <- addShutdownHook(deferred, dispatcher) + signal <- deferred.get + } yield ExitCode(128 + signal) + } + + private def addShutdownHook[F[_]: Sync](deferred: Deferred[F, Int], dispatcher: Dispatcher[F]): F[Unit] = + Sync[F].delay { + val handler = new SignalHandler { + override def handle(signal: Signal): Unit = + dispatcher.unsafeRunAndForget { + Logger[F].info(s"Received signal ${signal.getNumber}. Cancelling execution.") *> + deferred.complete(signal.getNumber) + } + } + Signal.handle(new Signal("TERM"), handler) + () + } + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index 675bb8ed..f71bbb42 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -34,7 +34,14 @@ import com.snowplowanalytics.snowplow.sinks.ListOfList import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics} import com.snowplowanalytics.snowplow.runtime.processing.BatchUp import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ -import com.snowplowanalytics.snowplow.loaders.transform.{NonAtomicFields, SchemaSubVersion, TabledEntity, Transform, TypedTabledEntity} +import com.snowplowanalytics.snowplow.loaders.transform.{ + BadRowsSerializer, + NonAtomicFields, + SchemaSubVersion, + TabledEntity, + Transform, + TypedTabledEntity +} object Processing { @@ -102,7 +109,7 @@ object Processing { .through(rememberTokens(ref)) .through(incrementReceivedCount(env)) .through(parseBytes(env, badProcessor)) - .through(handleParseFailures(env)) + .through(handleParseFailures(env, badProcessor)) .through(BatchUp.noTimeout(env.inMemBatchBytes)) .through(transformBatch(env, badProcessor, ref)) .through(sinkTransformedBatch(env, ref)) @@ -118,7 +125,7 @@ object Processing { nonAtomicFields <- NonAtomicFields.resolveTypes[F](env.resolver, entities, env.schemasToSkip) _ <- rememberColumnNames(ref, nonAtomicFields.fields) (bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields) - _ <- sendFailedEvents(env, bad) + _ <- sendFailedEvents(env, badProcessor, bad) _ <- ref.update(s => s.copy(numEvents = s.numEvents + rows.size)) } yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields)) } @@ -216,14 +223,22 @@ object Processing { } } - private def handleParseFailures[F[_]: Applicative, A](env: Environment[F]): Pipe[F, ParseResult, ParseResult] = + private def handleParseFailures[F[_]: Applicative, A]( + env: Environment[F], + badProcessor: BadRowProcessor + ): Pipe[F, ParseResult, ParseResult] = _.evalTap { batch => - sendFailedEvents(env, batch.bad) + sendFailedEvents(env, badProcessor, batch.bad) } - private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F], bad: List[BadRow]): F[Unit] = + private def sendFailedEvents[F[_]: Applicative, A]( + env: Environment[F], + badProcessor: BadRowProcessor, + bad: List[BadRow] + ): F[Unit] = if (bad.nonEmpty) { val serialized = bad.map(_.compactByteArray) + bad.map(badRow => BadRowsSerializer.withMaxSize(badRow, badProcessor, env.badRowMaxSize)) env.metrics.addBad(bad.size) *> env.badSink.sinkSimple(ListOfList.of(List(serialized))) } else Applicative[F].unit diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index 1805b9b2..2832fef0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -20,7 +20,6 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.types.StructType -import org.apache.spark.SnowplowOverrideShutdownHook import com.snowplowanalytics.snowplow.lakes.Config import com.snowplowanalytics.snowplow.lakes.tables.Writer @@ -47,9 +46,7 @@ private[processing] object SparkUtils { val closeLogF = Logger[F].info("Closing the global spark session...") val buildF = Sync[F].delay(builder.getOrCreate()) - Resource - .make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close())) <* - SnowplowOverrideShutdownHook.resource[F] + Resource.make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close())) } private def sparkConfigOptions(config: Config.Spark, writer: Writer): Map[String, String] = { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala index f263d6ef..7cb58b94 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala @@ -16,7 +16,6 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaConcurrentModificationException} -import org.apache.spark.sql.types.StructField import io.delta.tables.DeltaTable import com.snowplowanalytics.snowplow.lakes.Config @@ -39,9 +38,12 @@ class DeltaWriter(config: Config.Delta) extends Writer { .partitionedBy("load_tstamp_date", "event_name") .location(config.location.toString) .tableName("events_internal_id") // The name does not matter - .property("delta.dataSkippingNumIndexedCols", config.dataSkippingColumns.toSet.size.toString()) - fieldsForCreate(config).foreach(builder.addColumn(_)) + config.deltaTableProperties.foreach { case (k, v) => + builder.property(k, v) + } + + AtomicFields.withLoadTstamp.foreach(f => builder.addColumn(SparkSchema.asSparkField(f))) // This column needs special treatment because of the `generatedAlwaysAs` clause builder.addColumn { @@ -91,20 +93,4 @@ class DeltaWriter(config: Config.Delta) extends Writer { } } - /** - * Ordered spark Fields corresponding to the output of this loader - * - * Includes fields added by the loader, e.g. `load_tstamp` - * - * @param config - * The Delta config, whose `dataSkippingColumn` param tells us which columns must go first in - * the table definition. See Delta's data skipping feature to understand why. - */ - private def fieldsForCreate(config: Config.Delta): Iterable[StructField] = { - val (withStats, noStats) = AtomicFields.withLoadTstamp.partition { f => - config.dataSkippingColumns.contains(f.name) - } - (withStats ++ noStats).map(SparkSchema.asSparkField) - } - } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala index d44dc687..6f2dec40 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala @@ -30,15 +30,16 @@ class HudiWriter(config: Config.Hudi) extends Writer { ) override def prepareTable[F[_]: Sync](spark: SparkSession): F[Unit] = { - val tableProps = config.hudiTableOptions + val tableProps = config.hudiTableProperties .map { case (k, v) => s"'$k'='$v'" } .mkString(", ") - val internal_table_name = config.hudiTableOptions.get("hoodie.table.name").getOrElse("events") + val internal_table_name = config.hudiTableProperties.get("hoodie.table.name").getOrElse("events") Logger[F].info(s"Creating Hudi table ${config.location} if it does not already exist...") >> + maybeCreateDatabase[F](spark) *> Sync[F].blocking { spark.sql(s""" CREATE TABLE IF NOT EXISTS $internal_table_name @@ -47,9 +48,31 @@ class HudiWriter(config: Config.Hudi) extends Writer { LOCATION '${config.location}' TBLPROPERTIES($tableProps) """) + + // We call clean/archive during startup because it also triggers rollback of any previously + // failed commits. We want to do the rollbacks before early, so that we are immediately + // healthy once we start consuming events. + spark.sql(s""" + CALL run_clean(table => '$internal_table_name') + """) + spark.sql(s""" + CALL archive_commits(table => '$internal_table_name') + """) }.void } + private def maybeCreateDatabase[F[_]: Sync](spark: SparkSession): F[Unit] = + config.hudiWriteOptions.get("hoodie.datasource.hive_sync.database") match { + case Some(db) => + Sync[F].blocking { + // This action does not have any effect beyond the internals of this loader. + // It is required to prevent later exceptions for an unknown database. + spark.sql(s"CREATE DATABASE $db") + }.void + case None => + Sync[F].unit + } + override def write[F[_]: Sync](df: DataFrame): F[Unit] = Sync[F].blocking { df.write diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala index e655ef83..07c7ac84 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala @@ -47,7 +47,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { (${SparkSchema.ddlForCreate}) USING ICEBERG PARTITIONED BY (date(load_tstamp), event_name) - TBLPROPERTIES('write.spark.accept-any-schema'='true') + TBLPROPERTIES($tableProps) $locationClause """) }.void @@ -96,4 +96,11 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { ) ++ c.options } + private def tableProps: String = + config.icebergTableProperties + .map { case (k, v) => + s"'$k'='$v'" + } + .mkString(", ") + } diff --git a/modules/core/src/main/scala/org.apache.spark/SnowplowOverrideShutdownHook.scala b/modules/core/src/main/scala/org.apache.spark/SnowplowOverrideShutdownHook.scala deleted file mode 100644 index 4eb3bbfa..00000000 --- a/modules/core/src/main/scala/org.apache.spark/SnowplowOverrideShutdownHook.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. - * - * This software is made available by Snowplow Analytics, Ltd., - * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 - * located at https://docs.snowplow.io/limited-use-license-1.0 - * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION - * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. - */ - -package org.apache.spark - -import cats.implicits._ -import cats.effect.{Async, Deferred, Sync} -import cats.effect.std.Dispatcher -import cats.effect.kernel.Resource -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import org.apache.spark.util.{ShutdownHookManager => SparkShutdownHookManager} -import org.apache.hadoop.util.{ShutdownHookManager => HadoopShutdownHookManager} -import java.util.concurrent.TimeUnit - -/** - * This is needed to interrupt and override Spark's default behaviour of shutting down the - * SparkContext immediately after receiving a SIGINT - * - * We manage our own graceful termination, so Spark's default behaviour gets in our way - */ -object SnowplowOverrideShutdownHook { - - private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] - - def resource[F[_]: Async]: Resource[F, Unit] = - for { - dispatcher <- Dispatcher.sequential(await = true) - sig <- Resource.make(Deferred[F, Unit])(_.complete(()) >> Async[F].cede) - _ <- Resource.eval(addSparkShutdownHook(dispatcher, sig)) - _ <- Resource.eval(addHadoopShutdownHook(dispatcher, sig)) - } yield () - - private def addSparkShutdownHook[F[_]: Sync](dispatcher: Dispatcher[F], sig: Deferred[F, Unit]): F[Unit] = - Sync[F].delay { - SparkShutdownHookManager.addShutdownHook(999) { () => - try - dispatcher.unsafeRunSync { - Logger[F].info("Interrupted Spark's shutdown hook") >> - sig.get - } - catch { - case _: IllegalStateException => - // Expected if the cats-effect dispatcher is already shut down - () - } - } - }.void - - private def addHadoopShutdownHook[F[_]: Sync](dispatcher: Dispatcher[F], sig: Deferred[F, Unit]): F[Unit] = - Sync[F].delay { - val runnable = new Runnable { - def run(): Unit = - try - dispatcher.unsafeRunSync { - Logger[F].info("Interrupted Hadoop's shutdown hook") >> - sig.get - } - catch { - case _: IllegalStateException => - // Expected if the cats-effect dispatcher is already shut down - () - } - } - HadoopShutdownHookManager.get().addShutdownHook(runnable, 999, Int.MaxValue, TimeUnit.SECONDS) - } - -} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala index c4ae9277..7b61ae32 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala @@ -76,6 +76,7 @@ object MockEnvironment { inMemBatchBytes = 1000000L, cpuParallelism = 1, windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), + badRowMaxSize = 1000000, schemasToSkip = List.empty ) MockEnvironment(state, env) diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestConfig.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestConfig.scala index 1c6a156b..80ea3de5 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestConfig.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestConfig.scala @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.lakes import com.typesafe.config.ConfigFactory import cats.implicits._ import io.circe.config.syntax._ +import io.circe.Json import fs2.io.file.Path @@ -27,7 +28,7 @@ object TestConfig { def defaults(target: Target, tmpDir: Path): AnyConfig = ConfigFactory .load(ConfigFactory.parseString(configOverrides(target, tmpDir))) - .as[Config[Option[Unit], Option[Unit]]] match { + .as[Config[Option[Unit], Json]] match { case Right(ok) => ok case Left(e) => throw new RuntimeException("Could not load default config for testing", e) } @@ -37,7 +38,7 @@ object TestConfig { target match { case Delta => s""" - $acceptLicense + $commonRequiredConfig output.good: { type: "Delta" location: "$location" @@ -45,7 +46,7 @@ object TestConfig { """ case Hudi => s""" - $acceptLicense + $commonRequiredConfig output.good: { type: "Hudi" location: "$location" @@ -53,7 +54,7 @@ object TestConfig { """ case Iceberg => s""" - $acceptLicense + $commonRequiredConfig output.good: { type: "Iceberg" database: "test" @@ -67,11 +68,14 @@ object TestConfig { } } - private def acceptLicense: String = + private def commonRequiredConfig: String = """ license: { accept: true } + output.bad: { + maxRecordSize: 10000 + } """ } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index 9ae9362a..1c6f1343 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -44,6 +44,7 @@ object TestSparkEnvironment { inMemBatchBytes = 1000000L, cpuParallelism = 1, windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), + badRowMaxSize = 1000000, schemasToSkip = List.empty ) diff --git a/packaging/biglake/src/main/resources/reference.conf b/packaging/biglake/src/main/resources/reference.conf index 1bc5e114..78b8c025 100644 --- a/packaging/biglake/src/main/resources/reference.conf +++ b/packaging/biglake/src/main/resources/reference.conf @@ -10,7 +10,10 @@ "packaging": { "output": { "good": { - "type": "IcebergBigLake" + "type": "Iceberg" + "catalog": { + "type": "BigLake" + } } } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d5c3206b..9e7cbd7e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,21 +25,22 @@ object Dependencies { val spark35 = "3.5.1" val delta = "3.2.0" val hudi = "0.14.0" - val iceberg = "1.5.1" + val iceberg = "1.5.2" val hadoop = "3.4.0" val gcsConnector = "hadoop3-2.2.17" val biglakeIceberg = "0.1.0" val hive = "3.1.3" // java - val slf4j = "2.0.13" - val azureSdk = "1.11.4" - val sentry = "6.25.2" - val awsSdk1 = "1.12.646" - val awsSdk2 = "2.25.16" // Match common-streams + val slf4j = "2.0.13" + val azureSdk = "1.11.4" + val sentry = "6.25.2" + val awsSdk1 = "1.12.646" + val awsSdk2 = "2.25.16" // Match common-streams + val awsRegistry = "1.1.20" // Snowplow - val streams = "0.7.0-M2" + val streams = "0.7.0" val igluClient = "3.0.0" // Transitive overrides @@ -74,6 +75,7 @@ object Dependencies { val sparkHive34 = "org.apache.spark" %% "spark-hive" % V.spark34 val delta = "io.delta" %% "delta-spark" % V.delta val hudi = "org.apache.hudi" %% "hudi-spark3.4-bundle" % V.hudi + val hudiAws = "org.apache.hudi" % "hudi-aws" % V.hudi val iceberg = "org.apache.iceberg" %% "iceberg-spark-runtime-3.5" % V.iceberg val hadoopClient = "org.apache.hadoop" % "hadoop-client-runtime" % V.hadoop val hadoopAzure = "org.apache.hadoop" % "hadoop-azure" % V.hadoop @@ -82,12 +84,13 @@ object Dependencies { val hiveCommon = "org.apache.hive" % "hive-common" % V.hive // java - val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j - val azureIdentity = "com.azure" % "azure-identity" % V.azureSdk - val sentry = "io.sentry" % "sentry" % V.sentry - val awsGlue = "software.amazon.awssdk" % "glue" % V.awsSdk2 % Runtime - val awsS3 = "software.amazon.awssdk" % "s3" % V.awsSdk2 % Runtime - val awsSts = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime + val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j + val azureIdentity = "com.azure" % "azure-identity" % V.azureSdk + val sentry = "io.sentry" % "sentry" % V.sentry + val awsGlue = "software.amazon.awssdk" % "glue" % V.awsSdk2 + val awsS3 = "software.amazon.awssdk" % "s3" % V.awsSdk2 + val awsSts = "software.amazon.awssdk" % "sts" % V.awsSdk2 + val awsRegistry = "software.amazon.glue" % "schema-registry-serde" % V.awsRegistry // transitive overrides val protobuf = "com.google.protobuf" % "protobuf-java" % V.protobuf @@ -151,9 +154,9 @@ object Dependencies { kinesis, hadoopAws.exclude("software.amazon.awssdk", "bundle"), awsCore, // Dependency on aws sdk v1 will likely be removed in the next release of hadoop-aws - awsGlue, - awsS3, - awsSts, + awsGlue % Runtime, + awsS3 % Runtime, + awsSts % Runtime, hadoopClient ) ++ commonRuntimeDependencies @@ -183,6 +186,11 @@ object Dependencies { sparkHive34 % Runtime ) + val hudiAwsDependencies = Seq( + hudiAws % Runtime, + awsRegistry % Runtime + ) + val commonExclusions = Seq( ExclusionRule(organization = "org.apache.zookeeper", name = "zookeeper"), ExclusionRule(organization = "org.eclipse.jetty", name = "jetty-client"),