diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 39ec961..093c673 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -189,6 +189,12 @@ "iglu:com.acme/skipped4/jsonschema/*-*-*" ] + # -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema. + # -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema. + # -- When false, all nested fields are defined as nullable in the output table's schemas + # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. + "respectIgluNullability": true + "monitoring": { "metrics": { diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 35c3511..35132e8 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -160,6 +160,12 @@ "iglu:com.acme/skipped4/jsonschema/*-*-*" ] + # -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema. + # -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema. + # -- When false, all nested fields are defined as nullable in the output table's schemas + # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. + "respectIgluNullability": true + "monitoring": { "metrics": { diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index c897283..c77b279 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -168,6 +168,12 @@ "iglu:com.acme/skipped4/jsonschema/*-*-*" ] + # -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema. + # -- When true (default), nested fields are nullable only if they are not required fields according to the Iglu schema. + # -- When false, all nested fields are defined as nullable in the output table's schemas + # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. + "respectIgluNullability": true + "monitoring": { "metrics": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index e38190c..b570487 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -127,6 +127,7 @@ } "skipSchemas": [] + "respectIgluNullability": true "monitoring": { "metrics": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index a2b4fd8..b441ce0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -37,7 +37,8 @@ case class Config[+Source, +Sink]( telemetry: Telemetry.Config, monitoring: Config.Monitoring, license: AcceptedLicense, - skipSchemas: List[SchemaCriterion] + skipSchemas: List[SchemaCriterion], + respectIgluNullability: Boolean ) object Config { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index feddefa..026a5f5 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -52,7 +52,8 @@ case class Environment[F[_]]( inMemBatchBytes: Long, windowing: EventProcessingConfig.TimedWindows, badRowMaxSize: Int, - schemasToSkip: List[SchemaCriterion] + schemasToSkip: List[SchemaCriterion], + respectIgluNullability: Boolean ) object Environment { @@ -77,19 +78,20 @@ object Environment { metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics)) cpuParallelism = chooseCpuParallelism(config.main) } yield Environment( - appInfo = appInfo, - source = sourceAndAck, - badSink = badSink, - resolver = resolver, - httpClient = httpClient, - lakeWriter = lakeWriterWrapped, - metrics = metrics, - appHealth = appHealth, - cpuParallelism = cpuParallelism, - inMemBatchBytes = config.main.inMemBatchBytes, - windowing = windowing, - badRowMaxSize = config.main.output.bad.maxRecordSize, - schemasToSkip = config.main.skipSchemas + appInfo = appInfo, + source = sourceAndAck, + badSink = badSink, + resolver = resolver, + httpClient = httpClient, + lakeWriter = lakeWriterWrapped, + metrics = metrics, + appHealth = appHealth, + cpuParallelism = cpuParallelism, + inMemBatchBytes = config.main.inMemBatchBytes, + windowing = windowing, + badRowMaxSize = config.main.output.bad.maxRecordSize, + schemasToSkip = config.main.skipSchemas, + respectIgluNullability = config.main.respectIgluNullability ) private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index ef14a3e..573389b 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -127,7 +127,7 @@ object Processing { (bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields) _ <- sendFailedEvents(env, badProcessor, bad) _ <- ref.update(s => s.copy(numEvents = s.numEvents + rows.size)) - } yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields)) + } yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability)) } private def sinkTransformedBatch[F[_]: RegistryLookup: Sync]( diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala index b06cce4..434ebe1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala @@ -22,11 +22,11 @@ object SparkSchema { * * The returned schema includes atomic fields and non-atomic fields but not the load_tstamp column */ - private[processing] def forBatch(entities: Vector[TypedTabledEntity]): StructType = { + private[processing] def forBatch(entities: Vector[TypedTabledEntity], respectIgluNullability: Boolean): StructType = { val nonAtomicFields = entities.flatMap { tte => tte.mergedField :: tte.recoveries.map(_._2) } - StructType(atomic ++ nonAtomicFields.map(asSparkField)) + StructType(atomic ++ nonAtomicFields.map(asSparkField(_, respectIgluNullability))) } /** @@ -37,29 +37,30 @@ object SparkSchema { * @note * this is a `val` not a `def` because we use it over and over again. */ - val atomic: Vector[StructField] = AtomicFields.static.map(asSparkField) + val atomic: Vector[StructField] = AtomicFields.static.map(asSparkField(_, true)) /** String representation of the atomic schema for creating a table using SQL dialiect */ def ddlForCreate: String = - StructType(AtomicFields.withLoadTstamp.map(asSparkField)).toDDL + StructType(AtomicFields.withLoadTstamp.map(asSparkField(_, true))).toDDL - def asSparkField(ddlField: Field): StructField = { + def asSparkField(ddlField: Field, respectIgluNullability: Boolean): StructField = { val normalizedName = Field.normalize(ddlField).name - val dataType = fieldType(ddlField.fieldType) - StructField(normalizedName, dataType, ddlField.nullability.nullable) + val dataType = fieldType(ddlField.fieldType, respectIgluNullability) + StructField(normalizedName, dataType, !respectIgluNullability || ddlField.nullability.nullable) } - private def fieldType(ddlType: Type): DataType = ddlType match { - case Type.String => StringType - case Type.Boolean => BooleanType - case Type.Integer => IntegerType - case Type.Long => LongType - case Type.Double => DoubleType - case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale) - case Type.Date => DateType - case Type.Timestamp => TimestampType - case Type.Struct(fields) => StructType(fields.toVector.map(asSparkField)) - case Type.Array(element, elNullability) => ArrayType(fieldType(element), elNullability.nullable) - case Type.Json => StringType // Spark does not support the `Json` parquet logical type. + private def fieldType(ddlType: Type, respectIgluNullability: Boolean): DataType = ddlType match { + case Type.String => StringType + case Type.Boolean => BooleanType + case Type.Integer => IntegerType + case Type.Long => LongType + case Type.Double => DoubleType + case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale) + case Type.Date => DateType + case Type.Timestamp => TimestampType + case Type.Struct(fields) => StructType(fields.toVector.map(asSparkField(_, respectIgluNullability))) + case Type.Array(element, elNullability) => + ArrayType(fieldType(element, respectIgluNullability), !respectIgluNullability || elNullability.nullable) + case Type.Json => StringType // Spark does not support the `Json` parquet logical type. } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala index 7cb58b9..e6b8885 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala @@ -43,7 +43,7 @@ class DeltaWriter(config: Config.Delta) extends Writer { builder.property(k, v) } - AtomicFields.withLoadTstamp.foreach(f => builder.addColumn(SparkSchema.asSparkField(f))) + AtomicFields.withLoadTstamp.foreach(f => builder.addColumn(SparkSchema.asSparkField(f, true))) // This column needs special treatment because of the `generatedAlwaysAs` clause builder.addColumn { diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala index a5cec21..355f75c 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala @@ -69,19 +69,20 @@ object MockEnvironment { _ <- appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true) } yield { val env = Environment( - appInfo = TestSparkEnvironment.appInfo, - source = source, - badSink = testSink(state), - resolver = Resolver[IO](Nil, None), - httpClient = testHttpClient, - lakeWriter = testLakeWriter(state), - metrics = testMetrics(state), - appHealth = appHealth, - inMemBatchBytes = 1000000L, - cpuParallelism = 1, - windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), - badRowMaxSize = 1000000, - schemasToSkip = List.empty + appInfo = TestSparkEnvironment.appInfo, + source = source, + badSink = testSink(state), + resolver = Resolver[IO](Nil, None), + httpClient = testHttpClient, + lakeWriter = testLakeWriter(state), + metrics = testMetrics(state), + appHealth = appHealth, + inMemBatchBytes = 1000000L, + cpuParallelism = 1, + windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), + badRowMaxSize = 1000000, + schemasToSkip = List.empty, + respectIgluNullability = true ) MockEnvironment(state, env) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index f232801..082f3db 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -38,19 +38,20 @@ object TestSparkEnvironment { lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good) lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth) } yield Environment( - appInfo = appInfo, - source = source, - badSink = Sink[IO](_ => IO.unit), - resolver = Resolver[IO](Nil, None), - httpClient = testHttpClient, - lakeWriter = lakeWriterWrapped, - metrics = testMetrics, - appHealth = appHealth, - inMemBatchBytes = 1000000L, - cpuParallelism = 1, - windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), - badRowMaxSize = 1000000, - schemasToSkip = List.empty + appInfo = appInfo, + source = source, + badSink = Sink[IO](_ => IO.unit), + resolver = Resolver[IO](Nil, None), + httpClient = testHttpClient, + lakeWriter = lakeWriterWrapped, + metrics = testMetrics, + appHealth = appHealth, + inMemBatchBytes = 1000000L, + cpuParallelism = 1, + windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1), + badRowMaxSize = 1000000, + schemasToSkip = List.empty, + respectIgluNullability = true ) private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] =