Skip to content

Commit

Permalink
#137: Make column non nullable
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinwallimann committed Jun 15, 2020
1 parent d226f2f commit 359507a
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package za.co.absa.hyperdrive.ingestor.implementation.decoder.avro.confluent
import org.apache.commons.configuration2.Configuration
import org.apache.commons.lang3.{RandomStringUtils, StringUtils}
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.{Column, DataFrame}
import za.co.absa.abris.avro.functions.from_confluent_avro
import za.co.absa.abris.avro.read.confluent.SchemaManager._
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
import za.co.absa.hyperdrive.ingestor.api.decoder.{StreamDecoder, StreamDecoderFactory}
import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils
import za.co.absa.hyperdrive.ingestor.api.utils.ConfigUtils.getOrThrow
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
import za.co.absa.hyperdrive.ingestor.implementation.utils.{SchemaRegistryConsumerConfigKeys, SchemaRegistrySettingsUtil}
import za.co.absa.hyperdrive.shared.configurations.ConfigurationsKeys.AvroKafkaStreamDecoderKeys._

private[decoder] class ConfluentAvroKafkaStreamDecoder(
val topic: String,
Expand Down Expand Up @@ -60,9 +61,10 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder(

private def getKeyValueDataFrame(dataFrame: DataFrame,
keySchemaRegistrySettings: Map[String, String]) = {
val keyValueDf = dataFrame.select(
val decodedDf = dataFrame.select(
from_confluent_avro(col("key"), keySchemaRegistrySettings) as 'key,
from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'value)
val keyValueDf = setColumnNonNullable(decodedDf, "value")

val keyColumnNames = keyValueDf.select("key.*").columns.toSeq
val valueColumnNames = keyValueDf.select("value.*").columns.toSeq
Expand All @@ -77,11 +79,17 @@ private[decoder] class ConfluentAvroKafkaStreamDecoder(
}

private def getValueDataFrame(dataFrame: DataFrame) = {
dataFrame
val decodedDf = dataFrame
.select(from_confluent_avro(col("value"), valueSchemaRegistrySettings) as 'data)
setColumnNonNullable(decodedDf, "data")
.select("data.*")
}

private def setColumnNonNullable(dataFrame: DataFrame, columnName: String) = {
dataFrame
.filter(col(columnName).isNotNull)
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
}

}

Expand Down

0 comments on commit 359507a

Please sign in to comment.