Skip to content

Commit

Permalink
Allow disregarding Iglu field's nullability when creating output columns
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jun 20, 2024
1 parent 7b71eb5 commit 6d1cd92
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 59 deletions.
4 changes: 4 additions & 0 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectNullability": true

"monitoring": {
"metrics": {

Expand Down
4 changes: 4 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectNullability": true

"monitoring": {
"metrics": {

Expand Down
4 changes: 4 additions & 0 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

# -- Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema.
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectNullability": true

"monitoring": {
"metrics": {

Expand Down
1 change: 1 addition & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
}

"skipSchemas": []
"respectNullability": true

"monitoring": {
"metrics": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ case class Config[+Source, +Sink](
telemetry: Telemetry.Config,
monitoring: Config.Monitoring,
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion]
skipSchemas: List[SchemaCriterion],
respectNullability: Boolean
)

object Config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ case class Environment[F[_]](
inMemBatchBytes: Long,
windowing: EventProcessingConfig.TimedWindows,
badRowMaxSize: Int,
schemasToSkip: List[SchemaCriterion]
schemasToSkip: List[SchemaCriterion],
respectNullability: Boolean
)

object Environment {
Expand All @@ -76,18 +77,19 @@ object Environment {
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, isHealthy)
cpuParallelism = chooseCpuParallelism(config.main)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
lakeWriter = lakeWriter,
metrics = metrics,
cpuParallelism = cpuParallelism,
inMemBatchBytes = config.main.inMemBatchBytes,
windowing = windowing,
badRowMaxSize = config.main.output.bad.maxRecordSize,
schemasToSkip = config.main.skipSchemas
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
lakeWriter = lakeWriter,
metrics = metrics,
cpuParallelism = cpuParallelism,
inMemBatchBytes = config.main.inMemBatchBytes,
windowing = windowing,
badRowMaxSize = config.main.output.bad.maxRecordSize,
schemasToSkip = config.main.skipSchemas,
respectNullability = config.main.respectNullability
)

private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object Processing {
(bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields)
_ <- sendFailedEvents(env, badProcessor, bad)
_ <- ref.update(s => s.copy(numEvents = s.numEvents + rows.size))
} yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields))
} yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectNullability))
}

private def sinkTransformedBatch[F[_]: RegistryLookup: Sync](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ object SparkSchema {
*
* The returned schema includes atomic fields and non-atomic fields but not the load_tstamp column
*/
private[processing] def forBatch(entities: Vector[TypedTabledEntity]): StructType = {
private[processing] def forBatch(entities: Vector[TypedTabledEntity], respectNullability: Boolean): StructType = {
val nonAtomicFields = entities.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
StructType(atomic ++ nonAtomicFields.map(asSparkField))
StructType(atomic ++ nonAtomicFields.map(asSparkField(_, respectNullability)))
}

/**
Expand All @@ -37,29 +37,30 @@ object SparkSchema {
* @note
* this is a `val` not a `def` because we use it over and over again.
*/
val atomic: Vector[StructField] = AtomicFields.static.map(asSparkField)
val atomic: Vector[StructField] = AtomicFields.static.map(asSparkField(_, true))

/** String representation of the atomic schema for creating a table using SQL dialiect */
def ddlForCreate: String =
StructType(AtomicFields.withLoadTstamp.map(asSparkField)).toDDL
StructType(AtomicFields.withLoadTstamp.map(asSparkField(_, true))).toDDL

def asSparkField(ddlField: Field): StructField = {
def asSparkField(ddlField: Field, respectNullability: Boolean): StructField = {
val normalizedName = Field.normalize(ddlField).name
val dataType = fieldType(ddlField.fieldType)
StructField(normalizedName, dataType, ddlField.nullability.nullable)
val dataType = fieldType(ddlField.fieldType, respectNullability)
StructField(normalizedName, dataType, respectNullability && ddlField.nullability.nullable)
}

private def fieldType(ddlType: Type): DataType = ddlType match {
case Type.String => StringType
case Type.Boolean => BooleanType
case Type.Integer => IntegerType
case Type.Long => LongType
case Type.Double => DoubleType
case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale)
case Type.Date => DateType
case Type.Timestamp => TimestampType
case Type.Struct(fields) => StructType(fields.toVector.map(asSparkField))
case Type.Array(element, elNullability) => ArrayType(fieldType(element), elNullability.nullable)
case Type.Json => StringType // Spark does not support the `Json` parquet logical type.
private def fieldType(ddlType: Type, respectNullability: Boolean): DataType = ddlType match {
case Type.String => StringType
case Type.Boolean => BooleanType
case Type.Integer => IntegerType
case Type.Long => LongType
case Type.Double => DoubleType
case Type.Decimal(precision, scale) => DecimalType(Type.DecimalPrecision.toInt(precision), scale)
case Type.Date => DateType
case Type.Timestamp => TimestampType
case Type.Struct(fields) => StructType(fields.toVector.map(asSparkField(_, respectNullability)))
case Type.Array(element, elNullability) =>
ArrayType(fieldType(element, respectNullability), respectNullability && elNullability.nullable)
case Type.Json => StringType // Spark does not support the `Json` parquet logical type.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DeltaWriter(config: Config.Delta) extends Writer {
builder.property(k, v)
}

AtomicFields.withLoadTstamp.foreach(f => builder.addColumn(SparkSchema.asSparkField(f)))
AtomicFields.withLoadTstamp.foreach(f => builder.addColumn(SparkSchema.asSparkField(f, true)))

// This column needs special treatment because of the `generatedAlwaysAs` clause
builder.addColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,19 @@ object MockEnvironment {
state <- Ref[IO].of(Vector.empty[Action])
} yield {
val env = Environment(
appInfo = TestSparkEnvironment.appInfo,
source = testSourceAndAck(windows, state),
badSink = testSink(state),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = testLakeWriter(state),
metrics = testMetrics(state),
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
badRowMaxSize = 1000000,
schemasToSkip = List.empty
appInfo = TestSparkEnvironment.appInfo,
source = testSourceAndAck(windows, state),
badSink = testSink(state),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = testLakeWriter(state),
metrics = testMetrics(state),
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
badRowMaxSize = 1000000,
schemasToSkip = List.empty,
respectNullability = true
)
MockEnvironment(state, env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ object TestSparkEnvironment {
testConfig <- Resource.pure(TestConfig.defaults(target, tmpDir))
(lakeWriter, _) <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good)
} yield Environment(
appInfo = appInfo,
source = testSourceAndAck(windows),
badSink = Sink[IO](_ => IO.unit),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = lakeWriter,
metrics = testMetrics,
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
badRowMaxSize = 1000000,
schemasToSkip = List.empty
appInfo = appInfo,
source = testSourceAndAck(windows),
badSink = Sink[IO](_ => IO.unit),
resolver = Resolver[IO](Nil, None),
httpClient = testHttpClient,
lakeWriter = lakeWriter,
metrics = testMetrics,
inMemBatchBytes = 1000000L,
cpuParallelism = 1,
windowing = EventProcessingConfig.TimedWindows(1.minute, 1.0, 1),
badRowMaxSize = 1000000,
schemasToSkip = List.empty,
respectNullability = true
)

private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] =
Expand Down

0 comments on commit 6d1cd92

Please sign in to comment.