Skip to content

Commit

Permalink
Merge branch 'develop' into feature/259-spark-3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinwallimann authored Feb 15, 2022
2 parents 6f9cbeb + 712fe9b commit 86e8e75
Show file tree
Hide file tree
Showing 27 changed files with 1,248 additions and 26 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
restore-keys: |
${{ runner.os }}-${{ matrix.scala }}-${{ matrix.spark }}-
- name: License check
run: mvn clean validate -Plicense-check,spark-2,spark-3,scala-2.12
run: mvn clean validate --no-transfer-progress -Plicense-check,spark-2,spark-3,scala-2.12
- name: Switch scala version
run: mvn scala-cross-build:change-version -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }}
run: mvn scala-cross-build:change-version --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }}
- name: Build and run tests
run: mvn clean verify -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }},all-tests
run: mvn clean verify --no-transfer-progress -Pscala-${{ matrix.scala }},spark-${{ matrix.spark }},all-tests
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ If no file exists, the reader will fail.

Any additional properties can be added with the prefix `reader.parquet.options.`. See [Spark Structured Streaming Documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources)

##### ConfluentAvroStreamDecodingTransformer
The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamDecodingTransformer` requires the property `reader.kafka.topic` to be set.
##### ConfluentAvroDecodingTransformer
The `ConfluentAvroDecodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroDecodingTransformer` requires the property `reader.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -142,14 +142,18 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedAvroToSparkConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedAvroToSparkConverter.scala), which puts default value and underlying avro type to struct field metadata. Default false |

For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

##### ConfluentAvroStreamEncodingTransformer
The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroStreamEncodingTransformer` requires the property `writer.kafka.topic` to be set.
Note: `use.advanced.schema.conversion` only works with a patched version of Spark, due to bug [SPARK-34805](https://issues.apache.org/jira/browse/SPARK-34805).
For the latest version of Spark, the patch is available in https://github.com/apache/spark/pull/35270. For other versions of Spark, the changes need to be cherry-picked and built locally.

##### ConfluentAvroEncodingTransformer
The `ConfluentAvroEncodingTransformer` is built on [ABRiS](https://github.com/AbsaOSS/ABRiS). More details about the configuration properties can be found there.
**Caution**: The `ConfluentAvroEncodingTransformer` requires the property `writer.kafka.topic` to be set.

| Property Name | Required | Description |
| :--- | :---: | :--- |
Expand All @@ -164,6 +168,7 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |
| `transformer.{transformer-id}.use.advanced.schema.conversion` | No | Set to true to convert the avro schema using [AdvancedSparkToAvroConverter](https://github.com/AbsaOSS/hyperdrive/blob/develop/ingestor-default/src/main/scala/za/co/absa/hyperdrive/ingestor/implementation/transformer/avro/confluent/AdvancedSparkToAvroConverter.scala), which reads default value and underlying avro type from struct field metadata. Default false |

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright 2018 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.AdvancedAvroToSparkConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.abris.avro.sql.SchemaConverter
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

// scalastyle:off
class AdvancedAvroToSparkConverter extends SchemaConverter {
override val shortName: String = AdvancedAvroToSparkConverter.name
private lazy val objectMapper = new ObjectMapper()

case class SchemaType(dataType: DataType, nullable: Boolean, avroType: Option[Schema])

/**
* This function takes an avro schema and returns a sql schema.
*/
override def toSqlType(avroSchema: Schema): DataType = {
toSqlTypeHelper(avroSchema, Set.empty).dataType
}

def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = {
avroSchema.getType match {
case RECORD =>
if (existingRecordNames.contains(avroSchema.getFullName)) {
throw new IncompatibleSchemaException(s"""
|Found recursive reference in Avro schema, which can not be processed by Spark:
|${avroSchema.toString(true)}
""".stripMargin)
}
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val metadataBuilder = new MetadataBuilder()
val defaultJsonOpt = Option(JacksonUtils.toJsonNode(f.defaultVal()))
val metadataBuilderWithDefault = defaultJsonOpt match {
case Some(defaultJson) =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, defaultJson)
val r = metadataBuilder.putString(DefaultValueKey, baos.toString)
baos.close()
r
case None => metadataBuilder
}

val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => metadataBuilderWithDefault.putString(AvroTypeKey, schema).build())
.map(metadata => StructField(f.name, schemaType.dataType, schemaType.nullable, metadata))
.getOrElse(StructField(f.name, schemaType.dataType, schemaType.nullable, metadataBuilderWithDefault.build()))
}

SchemaType(StructType(fields), nullable = false, None)

case ARRAY =>
val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames)
SchemaType(
ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case MAP =>
val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames)
SchemaType(
MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
nullable = false,
schemaType.avroType)

case UNION =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
// In case of a union with null, eliminate it and make a recursive call
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
toSqlTypeHelper(remainingUnionTypes.head, existingRecordNames).copy(nullable = true)
} else {
toSqlTypeHelper(Schema.createUnion(remainingUnionTypes.asJava), existingRecordNames)
.copy(nullable = true)
}
} else avroSchema.getTypes.asScala.map(_.getType) match {
case Seq(t1) =>
toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames)
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
SchemaType(LongType, nullable = false, Option(avroSchema))
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
SchemaType(DoubleType, nullable = false, Option(avroSchema))
case _ =>
// Convert complex unions to struct types where field names are member0, member1, etc.
// This is consistent with the behavior when converting between Avro and Parquet.
val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
case (s, i) =>
val schemaType = toSqlTypeHelper(s, existingRecordNames)
schemaType.avroType
.map(_.toString)
.map(schema => new MetadataBuilder().putString(AvroTypeKey, schema).build())
.map(metadata => StructField(s"member$i", schemaType.dataType, schemaType.nullable, metadata))
// All fields are nullable because only one of them is set at a time
.getOrElse(StructField(s"member$i", schemaType.dataType, nullable = true))
}

SchemaType(StructType(fields), nullable = false, None)
}

case _ =>
val originalSchemaType = SchemaConverters.toSqlType(avroSchema)
SchemaType(originalSchemaType.dataType, originalSchemaType.nullable, Option(avroSchema))
}
}
}

// scalastyle:on
object AdvancedAvroToSparkConverter {
val name = "advanced"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.LogicalTypes.TimestampMillis
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.avro.{JsonProperties, LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper

import java.util.Objects
import scala.util.Try
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
private lazy val objectMapper = new ObjectMapper()

override def apply(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
toAvroType(catalystType, None, nullable, None, recordName, nameSpace)

// scalastyle:off
private def toAvroType(
catalystType: DataType,
avroSchema: Option[Schema],
nullable: Boolean = false,
defaultValue: Option[Object] = None,
recordName: String = "topLevelRecord",
nameSpace: String = "")
: Schema = {
val builder = SchemaBuilder.builder()

val schema = catalystType match {
case TimestampType => avroSchema match {
case Some(schema) if schema.getLogicalType.isInstanceOf[TimestampMillis] =>
LogicalTypes.timestampMillis().addToSchema(builder.longType())
case _ => LogicalTypes.timestampMicros().addToSchema(builder.longType())
}
case d: DecimalType => avroSchema match {
case Some(schema) if schema.getType == BYTES =>
val avroType = LogicalTypes.decimal(d.precision, d.scale)
avroType.addToSchema(SchemaBuilder.builder().bytesType())
case _ => getDecimalFixedType(d, avroSchema, nameSpace, recordName)
}
case BinaryType => avroSchema match {
case Some(schema) if schema.getType == FIXED =>
val name = getFixedName(recordName, nameSpace)
builder
.fixed(name)
.size(schema.getFixedSize)
case _ => builder.bytesType()
}
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, avroSchema, containsNull, defaultValue, recordName, nameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, avroSchema, valueContainsNull, defaultValue, recordName, nameSpace))
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
st.foreach { f =>
val schema = Try(f.metadata.getString(AvroTypeKey)).toOption
.map(schema => new Schema.Parser().parse(schema))
val defaultValueOpt = Try(f.metadata.getString(DefaultValueKey))
.flatMap(defaultJsonString => Try {
val jsonNode = objectMapper.readTree(defaultJsonString)
JacksonUtils.toObject(jsonNode)
}).toOption
val fieldAvroType =
toAvroType(f.dataType, schema, f.nullable, defaultValueOpt, f.name, childNameSpace)
defaultValueOpt match {
case Some(defaultObject) if !Objects.equals(defaultObject, JsonProperties.NULL_VALUE) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(defaultObject)
case Some(_) =>
fieldsAssembler.name(f.name).`type`(fieldAvroType).withDefault(null)
case _ => fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
}
fieldsAssembler.endRecord()

// nullability is handled later in this method, thus pass nullable = false
case _ => SchemaConverters.toAvroType(catalystType, nullable = false, recordName, nameSpace)
}
if (nullable) {
defaultValue match {
case Some(value) if !value.isInstanceOf[JsonProperties.Null] => Schema.createUnion(schema, nullSchema)
case _ => Schema.createUnion(nullSchema, schema)
}
} else {
schema
}
}

// scalastyle:on
private def getDecimalFixedType(d: DecimalType, avroSchema: Option[Schema], nameSpace: String, recordName: String) = {
val avroType = LogicalTypes.decimal(d.precision, d.scale)
val name = getFixedName(recordName, nameSpace)
val minBytes = minBytesForPrecision(d.precision)
val size = avroSchema.map { schema =>
if (schema.getFixedSize > minBytes) schema.getFixedSize else minBytes
}.getOrElse {
minBytes
}
avroType.addToSchema(SchemaBuilder.fixed(name).size(size))
}

private def getFixedName(recordName: String, nameSpace: String) = {
// Need to avoid naming conflict for the fixed fields
nameSpace match {
case "" => s"$recordName.fixed"
case _ => s"$nameSpace.$recordName.fixed"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with Co
override val namingStrategy: String = KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY
override val recordName: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME
override val recordNamespace: String = KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE
override val useAdvancedSchemaConversion: String = KEY_USE_ADVANCED_SCHEMA_CONVERSION
}

override def apply(config: Configuration): StreamTransformer = {
Expand Down
Loading

0 comments on commit 86e8e75

Please sign in to comment.