Skip to content

Commit

Permalink
Migrate to scio-0.14
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Jan 26, 2024
1 parent b171642 commit 0cb8f51
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 71 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ lazy val ratatoolCommon = project
"com.google.guava" % "guava" % guavaVersion,
"org.apache.beam" % "beam-vendor-guava-32_1_2-jre" % beamVendorVersion,
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test,
"org.apache.avro" % "avro" % avroTestVersion % Test,
"org.apache.avro" % "avro" % avroTestVersion % Test classifier "tests",
"org.apache.avro" % "avro" % avroVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test classifier "tests",
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test,
),
// In case of scalacheck failures print more info
Expand Down
4 changes: 1 addition & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.10.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")

libraryDependencies ++= Seq(
"org.apache.avro" % "avro-compiler" % "1.11.3"
)
libraryDependencies ++= Seq("org.apache.avro" % "avro-compiler" % "1.8.2")
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package com.spotify.ratatool.diffy

import com.spotify.scio.coders.Coder
import org.apache.avro.Schema
import org.apache.avro.{Conversion, Schema}
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.avro.specific.SpecificData

import scala.jdk.CollectionConverters._
import scala.util.Try

/** Field level diff tool for Avro records. */
class AvroDiffy[T <: IndexedRecord: Coder](
Expand All @@ -30,6 +32,29 @@ class AvroDiffy[T <: IndexedRecord: Coder](
unorderedFieldKeys: Map[String, String] = Map()
) extends Diffy[T](ignore, unordered, unorderedFieldKeys) {

private lazy val avroRuntimeVersion =
Option(classOf[Schema].getPackage.getImplementationVersion)

// after avro 1.8, use SpecificData.getForClass
private def dataForClass(cls: Class[_]): SpecificData = Try {
val modelField = cls.getDeclaredField("MODEL$")
modelField.setAccessible(true)
val data = modelField.get(null).asInstanceOf[SpecificData]

// avro 1.8 generated code does not add conversions to the data
if (avroRuntimeVersion.exists(_.startsWith("1.8."))) {
val conversionsField = cls.getDeclaredField("conversions")
conversionsField.setAccessible(true)
val conversions = conversionsField.get(null).asInstanceOf[Array[Conversion[_]]]
conversions.filterNot(_ == null).foreach(data.addLogicalTypeConversion)
}

data
}.recover { case _: NoSuchFieldException | _: IllegalAccessException =>
// Return default instance
SpecificData.get()
}.get

override def apply(x: T, y: T): Seq[Delta] = (x, y) match {
case (null, null) => Seq.empty
case (_, null) => Seq(Delta("", Some(x), None, UnknownDelta))
Expand Down Expand Up @@ -60,7 +85,7 @@ class AvroDiffy[T <: IndexedRecord: Coder](
private def diff(x: AnyRef, y: AnyRef, schema: Schema, field: String): Seq[Delta] = {
val deltas = schema.getType match {
case Schema.Type.UNION =>
val data = GenericData.get()
val data = SpecificData.get()
val xTypeIndex = data.resolveUnion(schema, x)
val yTypeIndex = data.resolveUnion(schema, y)
if (xTypeIndex != yTypeIndex) {
Expand All @@ -74,7 +99,6 @@ class AvroDiffy[T <: IndexedRecord: Coder](
case Schema.Type.RECORD =>
val a = x.asInstanceOf[IndexedRecord]
val b = y.asInstanceOf[IndexedRecord]

for {
f <- schema.getFields.asScala.toSeq
pos = f.pos()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,32 @@

package com.spotify.ratatool.diffy

import java.nio.ByteBuffer
import com.google.api.services.bigquery.model.{TableFieldSchema, TableRow, TableSchema}
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import com.google.protobuf.AbstractMessage
import com.spotify.ratatool.Command
import com.spotify.ratatool.io.ParquetIO
import com.spotify.ratatool.samplers.{AvroSampler, ParquetSampler}
import com.spotify.ratatool.samplers.AvroSampler
import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.bigquery._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.bigquery.client.BigQuery
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.coders.Coder
import com.spotify.scio.coders.kryo._
import com.spotify.scio.io.ClosedTap
import com.spotify.scio.parquet.avro._
import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType
import org.apache.avro.{Schema, SchemaCompatibility}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import org.slf4j.{Logger, LoggerFactory}

import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -201,28 +195,6 @@ object BigDiffy extends Command with Serializable {
val command: String = "bigDiffy"
@transient private lazy val logger: Logger = LoggerFactory.getLogger(BigDiffy.getClass)

// When Utf8 is used for CharSequence, it messes immutability check with beam
// Use custom coder that uses String instead of Utf8
private object GenericRecordDatumFactory extends AvroDatumFactory.GenericDatumFactory {

private class ScioGenericDatumReader extends GenericDatumReader[GenericRecord] {
override def findStringClass(schema: Schema): Class[_] = super.findStringClass(schema) match {
case cls if cls == classOf[CharSequence] => classOf[String]
case cls => cls
}
}

override def apply(writer: Schema, reader: Schema): DatumReader[GenericRecord] = {
val datumReader = new ScioGenericDatumReader()
datumReader.setExpected(reader)
datumReader.setSchema(writer)
datumReader
}
}

def avroGenericRecordCoder(schema: Schema): Coder[GenericRecord] =
Coder.beam(AvroCoder.of(GenericRecordDatumFactory, schema))

// (field, deltas, diff type)
type DeltaSCollection = SCollection[(MultiKey, (Seq[Delta], DiffType.Value))]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@

package com.spotify.ratatool.diffy

import java.nio.ByteBuffer

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import com.spotify.scio.avro._
import com.spotify.ratatool.Schemas
import com.spotify.ratatool.avro.specific._
import com.spotify.ratatool.scalacheck._
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import com.spotify.scio.avro._
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.beam.sdk.util.CoderUtils
import org.scalacheck.Arbitrary

import scala.jdk.CollectionConverters._
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.ByteBuffer
import scala.jdk.CollectionConverters._

class AvroDiffyTest extends AnyFlatSpec with Matchers {

def jl[T](x: T*): java.util.List[T] = List(x: _*).asJava
Expand All @@ -49,7 +47,7 @@ class AvroDiffyTest extends AnyFlatSpec with Matchers {
}

it should "support nested fields" in {
val coder = AvroCoder.reflect(classOf[TestRecord])
val coder = CoderMaterializer.beamWithDefault(Coder[TestRecord])

val nnr = specificRecordOf[NullableNestedRecord].sample.get
nnr.setIntField(10)
Expand Down Expand Up @@ -86,7 +84,7 @@ class AvroDiffyTest extends AnyFlatSpec with Matchers {
}

it should "support repeated fields" in {
val coder = AvroCoder.reflect(classOf[TestRecord])
val coder = CoderMaterializer.beamWithDefault(Coder[TestRecord])

val x = specificRecordOf[TestRecord].sample.get
x.getRepeatedFields.setIntField(jl(10, 11))
Expand Down Expand Up @@ -130,7 +128,7 @@ class AvroDiffyTest extends AnyFlatSpec with Matchers {
}

it should "support unordered" in {
val coder = AvroCoder.reflect(classOf[TestRecord])
val coder = CoderMaterializer.beamWithDefault(Coder[TestRecord])

val a = NullableNestedRecord.newBuilder().setIntField(10).setLongField(100L).build()
val b = NullableNestedRecord.newBuilder().setIntField(20).setLongField(200L).build()
Expand All @@ -153,8 +151,8 @@ class AvroDiffyTest extends AnyFlatSpec with Matchers {
}

it should "support unordered nested" in {
val drnrCoder = AvroCoder.reflect(classOf[RepeatedRecord])
val drrCoder = AvroCoder.reflect(classOf[DeeplyRepeatedRecord])
val drnrCoder = CoderMaterializer.beamWithDefault(Coder[RepeatedRecord])
val drrCoder = CoderMaterializer.beamWithDefault(Coder[DeeplyRepeatedRecord])

val a = avroOf[RepeatedRecord].sample.get
a.setNestedRepeatedField(jl(10, 20, 30))
Expand Down Expand Up @@ -185,8 +183,8 @@ class AvroDiffyTest extends AnyFlatSpec with Matchers {
}

it should "support unordered nested of different lengths" in {
val drnrCoder = AvroCoder.reflect(classOf[RepeatedRecord])
val drrCoder = AvroCoder.reflect(classOf[DeeplyRepeatedRecord])
val drnrCoder = CoderMaterializer.beamWithDefault(Coder[RepeatedRecord])
val drrCoder = CoderMaterializer.beamWithDefault(Coder[DeeplyRepeatedRecord])

val a = avroOf[RepeatedRecord].sample.get
a.setNestedRepeatedField(jl(30, 20, 10))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import com.google.api.services.bigquery.model.{TableFieldSchema, TableRow, Table
import com.spotify.ratatool.diffy.BigDiffy.{avroKeyFn, mergeTableSchema, stripQuoteWrap}
import com.spotify.ratatool.io.{ParquetIO, ParquetTestData}
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder

import scala.jdk.CollectionConverters._
Expand All @@ -41,7 +41,7 @@ import scala.language.higherKinds
class BigDiffyTest extends PipelineSpec {

val keys = (1 to 1000).map(k => MultiKey("key" + k))
val coder = AvroCoder.reflect(classOf[TestRecord])
val coder = CoderMaterializer.beamWithDefault(Coder[TestRecord])

/** Fixed to a small range so that Std. Dev. & Variance calculations are easier to predict */
val rnr = specificRecordOf[RequiredNestedRecord]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy, MultiKey}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.util.CoderUtils
import com.spotify.scio.avro._
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import com.spotify.scio.coders.{Coder, CoderMaterializer}

object PreProcessBigDiffy {
def recordKeyFn(r: ExampleRecord): MultiKey =
MultiKey(r.getRecordId.toString)

def mapFn(coder: => AvroCoder[ExampleRecord])(r: ExampleRecord): ExampleRecord = {
val o = CoderUtils.clone(coder, r)
if (o.getNullableIntField == null) {
o.setNullableIntField(0)
def mapFn(coder: Coder[ExampleRecord]): ExampleRecord => ExampleRecord = {
val bCoder = CoderMaterializer.beamWithDefault(coder)

{ (r: ExampleRecord) =>
val o = CoderUtils.clone(bCoder, r)
if (o.getNullableIntField == null) {
o.setNullableIntField(0)
}
o
}
o
}

def main(cmdlineArgs: Array[String]): Unit = {
@transient lazy val coder = AvroCoder.reflect(classOf[ExampleRecord])
val coder = Coder[ExampleRecord]
val (sc, args) = ContextAndArgs(cmdlineArgs)

val (lhsPath, rhsPath, output, header, ignore, unordered) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import com.spotify.scio.testing._
import org.apache.beam.sdk.util.CoderUtils
import org.scalacheck.Gen
import com.spotify.scio.avro._
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.TextIO
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder

class PreProcessBigDiffyJobTest extends PipelineSpec {

val coder = CoderMaterializer.beamWithDefault(Coder[ExampleRecord])

val lhs = Gen.listOfN(1000, ExampleAvroGen.exampleRecordGen).sample.get.map { r =>
r.setNullableIntField(null)
r
}

val rhs = lhs.map(CoderUtils.clone(AvroCoder.reflect(classOf[ExampleRecord]), _)).map { r =>
val rhs = lhs.map(CoderUtils.clone(coder, _)).map { r =>
r.setNullableIntField(0)
r
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,25 @@ object AvroGeneratorOps extends AvroGeneratorOps

trait AvroGeneratorOps {

private lazy val avroRuntimeVersion =
Option(classOf[Schema].getPackage.getImplementationVersion)

// after avro 1.8, use SpecificData.getForClass
private def dataForClass(cls: Class[_]): SpecificData = Try {
val modelField = cls.getDeclaredField("MODEL$")
modelField.setAccessible(true)
modelField.get(null).asInstanceOf[SpecificData]
}.recover { case _: NoSuchFieldException =>
val data = modelField.get(null).asInstanceOf[SpecificData]

// avro 1.8 generated code does not add conversions to the data
if (avroRuntimeVersion.exists(_.startsWith("1.8."))) {
val conversionsField = cls.getDeclaredField("conversions")
conversionsField.setAccessible(true)
val conversions = conversionsField.get(null).asInstanceOf[Array[Conversion[_]]]
conversions.filterNot(_ == null).foreach(data.addLogicalTypeConversion)
}

data
}.recover { case _: NoSuchFieldException | _: IllegalAccessException =>
// Return default instance
SpecificData.get()
}.get
Expand Down

0 comments on commit 0cb8f51

Please sign in to comment.