From 359507a85dd76f58b86be8898403f5daa5b5cc91 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Mon, 15 Jun 2020 17:03:43 +0200 Subject: [PATCH] #137: Make column non nullable --- .../ConfluentAvroKafkaStreamDecoder.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala index 2463ab4c..0a4bd4e5 100644 --- a/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala +++ b/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/decoder/avro/confluent/ConfluentAvroKafkaStreamDecoder.scala @@ -18,18 +18,19 @@ package za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent import org.apache.commons.configuration2.Configuration import org.apache.commons.lang3.{RandomStringUtils, StringUtils} import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.functions.col -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.DataStreamReader +import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.abris.avro.functions.from_confluent_avro import za.co.absa.abris.avro.read.confluent.SchemaManager._ import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory} -import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil} -import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._ import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys +import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil} +import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._ private[decoder] class ConfluentAvroKafkaStreamDecoder( val topic: String, @@ -60,9 +61,10 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder( private def getKeyValueDataFrame(dataFrame: DataFrame, keySchemaRegistrySettings: Map[String, String]) = { - val keyValueDf = dataFrame.select( + val decodedDf = dataFrame.select( from_confluent_avro(col("key"), keySchemaRegistrySettings) as 'key, from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'value) + val keyValueDf = setColumnNonNullable(decodedDf, "value") val keyColumnNames = keyValueDf.select("key.*").columns.toSeq val valueColumnNames = keyValueDf.select("value.*").columns.toSeq @@ -77,11 +79,17 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder( } private def getValueDataFrame(dataFrame: DataFrame) = { - dataFrame + val decodedDf = dataFrame .select(from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'data) + setColumnNonNullable(decodedDf, "data") .select("data.*") } + private def setColumnNonNullable(dataFrame: DataFrame, columnName: String) = { + dataFrame + .filter(col(columnName).isNotNull) + .withColumn(columnName, new Column(AssertNotNull(col(columnName).expr))) + } }