Skip to content

Commit

Permalink
Feature/259 spark 3.2 (#260)
Browse files Browse the repository at this point in the history
* Upgrade to Spark 3.2

* works. now try with spark-2 and -Pall-tests

* exclude instead of enforce deps

* works for spark-3

* Exclude jackson-annotations only for spark-3 profile

* wip

* Something doesn't work

* Use two different implementations for JsonNode

* Use different confluent versions for different abris versions

Signed-off-by: Kevin Wallimann <[email protected]>

* Remove obsolete properties

Signed-off-by: Kevin Wallimann <[email protected]>

* Add -X for debug

* Use maven 3.8.4

3.8.5 causes build failures

* Remove avro exclusion, probably redundant

Signed-off-by: Kevin Wallimann <[email protected]>

* hadoop-client-api provided

Signed-off-by: Kevin Wallimann <[email protected]>

* Fix no such method exception for commons-text stringlookupfactory dnslookup

Signed-off-by: Kevin Wallimann <[email protected]>

* Remove obsolete test case

Signed-off-by: Kevin Wallimann <[email protected]>

* Increase Spark version to 3.2.2

Signed-off-by: Kevin Wallimann <[email protected]>
  • Loading branch information
kevinwallimann authored Aug 3, 2022
1 parent ed18c59 commit 3c36424
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,12 @@ class TestConfigUtils extends FlatSpec with Matchers with MockitoSugar {
val config = new BaseConfiguration
config.addProperty("key1", "")
config.addProperty("key2", "nay")
config.addProperty("key3", "1")
config.addProperty("key4", 0)

val ex1 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key1", config)
ex1.getMessage should include("key1")
val ex2 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key2", config)
ex2.getMessage should include("key2")
val ex3 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key3", config)
ex3.getMessage should include("key3")
val ex4 = the[Exception] thrownBy ConfigUtils.getOptionalBoolean("key4", config)
ex4.getMessage should include("key4")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ import org.apache.spark.sql.execution.streaming.MetadataLogFileIndex
trait CompatibleSparkUtil {
def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex
def hasMetadata(spark: SparkSession, destination: String): Boolean
def jsonStringToObject(jsonString: String): Object
def objectToJsonString(obj: Object): Option[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@ object CompatibleSparkUtilProvider extends CompatibleSparkUtil {

def hasMetadata(spark: SparkSession, destination: String): Boolean =
SparkUtil.hasMetadata(spark, destination)

override def jsonStringToObject(jsonString: String): Object =
SparkUtil.jsonStringToObject(jsonString)

override def objectToJsonString(obj: Object): Option[String] =
SparkUtil.objectToJsonString(obj)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,35 @@

package za.co.absa.hyperdrive.compatibility.impl

import org.apache.avro.util.internal.JacksonUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil

import java.io.ByteArrayOutputStream

object SparkUtil extends CompatibleSparkUtil {
private lazy val objectMapper = new ObjectMapper()

override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
new MetadataLogFileIndex(spark, new Path(destination), None)

override def hasMetadata(spark: SparkSession, destination: String): Boolean =
FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration)

override def jsonStringToObject(jsonString: String): Object = {
val jsonNode = objectMapper.readTree(jsonString)
JacksonUtils.toObject(jsonNode)
}

override def objectToJsonString(obj: Object): Option[String] = {
Option(JacksonUtils.toJsonNode(obj))
.map { json =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, json)
baos.toString
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,35 @@

package za.co.absa.hyperdrive.compatibility.impl

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.avro.util.internal.JacksonUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import za.co.absa.hyperdrive.compatibility.api.CompatibleSparkUtil

import java.io.ByteArrayOutputStream

object SparkUtil extends CompatibleSparkUtil {
private lazy val objectMapper = new ObjectMapper()

override def createMetadataLogFileIndex(spark: SparkSession, destination: String): MetadataLogFileIndex =
new MetadataLogFileIndex(spark, new Path(destination), Map.empty, None)

override def hasMetadata(spark: SparkSession, destination: String): Boolean =
FileStreamSink.hasMetadata(Seq(destination), spark.sparkContext.hadoopConfiguration, spark.sessionState.conf)

override def jsonStringToObject(jsonString: String): Object = {
val jsonNode = objectMapper.readTree(jsonString)
JacksonUtils.toObject(jsonNode)
}

override def objectToJsonString(obj: Object): Option[String] = {
Option(JacksonUtils.toJsonNode(obj))
.map { json =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, json)
baos.toString
}
}
}
12 changes: 0 additions & 12 deletions component-scanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,5 @@
<artifactId>api_${scala.compat.version}</artifactId>
<version>4.6.1-SNAPSHOT</version>
</dependency>

<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
4 changes: 4 additions & 0 deletions hyperdrive-release_spark-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
</goals>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.commons.text</pattern>
<shadedPattern>org.apache.shaded.commons.text</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.beanutils</pattern>
<shadedPattern>org.apache.shaded.commons.beanutils</shadedPattern>
Expand Down
1 change: 0 additions & 1 deletion ingestor-default/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
<artifactId>compatibility-provider_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!--ABRiS-->
<dependency>
<groupId>io.confluent</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.abris.avro.sql.SchemaConverter
import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

import java.io.ByteArrayOutputStream
import scala.collection.JavaConverters._

// scalastyle:off
class AdvancedAvroToSparkConverter extends SchemaConverter {
override val shortName: String = AdvancedAvroToSparkConverter.name
private lazy val objectMapper = new ObjectMapper()

case class SchemaType(dataType: DataType, nullable: Boolean, avroType: Option[Schema])

Expand All @@ -53,14 +50,9 @@ class AdvancedAvroToSparkConverter extends SchemaConverter {
val newRecordNames = existingRecordNames + avroSchema.getFullName
val fields = avroSchema.getFields.asScala.map { f =>
val metadataBuilder = new MetadataBuilder()
val defaultJsonOpt = Option(JacksonUtils.toJsonNode(f.defaultVal()))
val defaultJsonOpt = CompatibleSparkUtilProvider.objectToJsonString(f.defaultVal())
val metadataBuilderWithDefault = defaultJsonOpt match {
case Some(defaultJson) =>
val baos = new ByteArrayOutputStream()
objectMapper.writeValue(baos, defaultJson)
val r = metadataBuilder.putString(DefaultValueKey, baos.toString)
baos.close()
r
case Some(defaultJson) => metadataBuilder.putString(DefaultValueKey, defaultJson)
case None => metadataBuilder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.LogicalTypes.TimestampMillis
import org.apache.avro.Schema.Type._
import org.apache.avro.util.internal.JacksonUtils
import org.apache.avro.{JsonProperties, LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
import org.codehaus.jackson.map.ObjectMapper
import za.co.absa.hyperdrive.compatibility.provider.CompatibleSparkUtilProvider

import java.util.Objects
import scala.util.Try
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.SparkMetadataKeys._

object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
private lazy val nullSchema = Schema.create(Schema.Type.NULL)
private lazy val objectMapper = new ObjectMapper()

override def apply(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema =
toAvroType(catalystType, None, nullable, None, recordName, nameSpace)
Expand Down Expand Up @@ -80,8 +78,7 @@ object AdvancedSparkToAvroConverter extends SparkToAvroConverter {
.map(schema => new Schema.Parser().parse(schema))
val defaultValueOpt = Try(f.metadata.getString(DefaultValueKey))
.flatMap(defaultJsonString => Try {
val jsonNode = objectMapper.readTree(defaultJsonString)
JacksonUtils.toObject(jsonNode)
CompatibleSparkUtilProvider.jsonStringToObject(defaultJsonString)
}).toOption
val fieldAvroType =
toAvroType(f.dataType, schema, f.nullable, defaultValueOpt, f.name, childNameSpace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.avro.util.Utf8
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

private[hyperdrive] object AvroUtil {

Expand Down Expand Up @@ -49,14 +49,19 @@ private[hyperdrive] object AvroUtil {
private def getFromGenericRecordNullSafe(record: GenericRecord, keys: Seq[String]) =
Option(record).flatMap(getFromGenericRecord(_, keys))

@tailrec
private def getFromGenericRecord(record: GenericRecord, keys: Seq[String]): Option[Any] = keys match {
case key :: Nil => Option(record.get(key))
case key :: Nil => getValueSafely(() => record.get(key))
case head :: tail =>
val value = record.get(head)
value match {
getValueSafely(() => record.get(head)).flatMap {
case genericRecord: GenericRecord => getFromGenericRecord(genericRecord, tail)
case _ => None
}
}

private def getValueSafely[T](fn: () => T): Option[T] = {
Try(fn.apply()) match {
case Success(value) => Option(value)
case Failure(_) => None
}
}
}
60 changes: 20 additions & 40 deletions parent-conf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,12 @@

<properties>
<!--Enforced versions-->
<avro.version>1.8.2</avro.version> <!--Same as Abris uses-->
<commons.text.version>1.8</commons.text.version>
<paranamer.version>2.8</paranamer.version>

<!--Maven-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!--ABRiS-->
<abris.version>5.1.1</abris.version>

<!--Scala-->
<scalatest.version>3.0.5</scalatest.version>
<scalatest.maven.version>2.0.2</scalatest.maven.version>
Expand All @@ -61,7 +56,6 @@
<spring.kafka.test.version>2.2.4.RELEASE</spring.kafka.test.version>
<kafka.spark.version>0-10</kafka.spark.version>
<testcontainers.kafka.version>1.15.1</testcontainers.kafka.version>
<kafka.avro.serializer.version>5.3.4</kafka.avro.serializer.version> <!--Same as Abris uses-->

<embedded.mongo.version>2.1.2</embedded.mongo.version>

Expand All @@ -86,16 +80,6 @@
</properties>

<dependencies>
<!--Enforced dependency versions-->
<!--Avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
Expand Down Expand Up @@ -139,17 +123,6 @@

<dependencies>
<!--Enforced dependency versions-->
<!--Avro-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${commons.text.version}</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
Expand Down Expand Up @@ -199,13 +172,25 @@
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</exclusion>
</exclusions>
</dependency>

<!--Kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-${kafka.spark.version}_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- MongoDb Spark connector -->
Expand Down Expand Up @@ -263,19 +248,14 @@
<!-- Cannot have scope test because it is used by abris -->
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.avro.serializer.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.kafka.version}</version>
<scope>test</scope>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<!-- Excluded because version >= 2.12.0 is required for Spark 3.2.0 -->
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
Loading

0 comments on commit 3c36424

Please sign in to comment.