Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve avro support #691

Merged
merged 13 commits into from
Feb 16, 2024
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ val algebirdVersion = "0.13.10"
// Keep in sync with Scio: https://github.com/spotify/scio/blob/v0.14.0/build.sbt
val scioVersion = "0.14.0"

val avroVersion = "1.8.2" // keep in sync with scio
val avroVersion = avroCompilerVersion // keep in sync with scio
val beamVersion = "2.53.0" // keep in sync with scio
val beamVendorVersion = "0.1" // keep in sync with scio
val bigqueryVersion = "v2-rev20230812-2.0.0" // keep in sync with scio
Expand Down Expand Up @@ -163,9 +163,9 @@ lazy val ratatoolCommon = project
name := "ratatool-common",
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % avroVersion,
"org.apache.avro" % "avro-mapred" % avroVersion classifier "hadoop2",
"com.google.guava" % "guava" % guavaVersion,
"com.google.apis" % "google-api-services-bigquery" % bigqueryVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test,
"org.apache.avro" % "avro" % avroVersion % Test classifier "tests",
"org.slf4j" % "slf4j-simple" % slf4jVersion % Test,
),
Expand Down Expand Up @@ -285,6 +285,7 @@ lazy val ratatoolScalacheck = project
name := "ratatool-scalacheck",
libraryDependencies ++= Seq(
"org.apache.avro" % "avro" % avroVersion,
"joda-time" % "joda-time" % jodaTimeVersion,
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-extensions-avro" % beamVersion,
Expand Down
4 changes: 1 addition & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.10.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")

libraryDependencies ++= Seq(
"org.apache.avro" % "avro-compiler" % "1.8.2"
)
libraryDependencies ++= Seq("org.apache.avro" % "avro-compiler" % "1.8.2")
239 changes: 144 additions & 95 deletions ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/AvroDiffy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,118 +18,167 @@
package com.spotify.ratatool.diffy

import com.spotify.scio.coders.Coder
import org.apache.avro.{Schema, SchemaValidatorBuilder}
import org.apache.avro.generic.GenericRecord
import scala.util.Try
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.avro.specific.SpecificData

import scala.jdk.CollectionConverters._

/** Field level diff tool for Avro records. */
class AvroDiffy[T <: GenericRecord: Coder](
/**
* Field level diff tool for Avro records.
*
* @param ignore
* specify set of fields to ignore during comparison.
* @param unordered
* a list of fields to be treated as unordered, i.e. sort before comparison.
* @param unorderedFieldKeys
* a map of record field names to fields names that can be keyed by when comparing nested repeated
* records. (currently not support in CLI)
*/
class AvroDiffy[T <: IndexedRecord: Coder](
ignore: Set[String] = Set.empty,
unordered: Set[String] = Set.empty,
unorderedFieldKeys: Map[String, String] = Map()
) extends Diffy[T](ignore, unordered, unorderedFieldKeys) {

override def apply(x: T, y: T): Seq[Delta] = {
new SchemaValidatorBuilder().canReadStrategy
.validateLatest()
.validate(y.getSchema, List(x.getSchema).asJava)
Comment on lines -35 to -37
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were there any breaking changes on this implementation, or is there any other reason why we're moving away from avro schema validation? I'm not very familiar with how strict is the avro definition of "compatible schemas", but at first glance it seems like we're loosing some flexibility and/or some level of detail with the new validations. This is not my area of expertise, though, so your recommendations are more than welcome!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of diff, avro schemas must be strictly equal. This is checked line 62 in the new version.

Compatible schema are used on read time to adapt stored data to the desired read schema. Once in memory, we should not compare data constructed with different schema, even if compatible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what you mean? I'm a bit confused reading this thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avro record with different models must not be compared.

schema compatibility is relevant when reading, making sure the writerSchema and the readerSchema are compatible. Once read, the records strictly folow the readerSchema where field index matters. Strict schema equality must be ensured before comparing content.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, trying to confirm my understanding here since I'm still a bit confused.
Dataset A is updated to add new nullable field x and becomes Dataset A'.
We go to diff these two datasets.
Are you saying that this field comparison will end up in one of the above null cases prior to the schema comparison?

Copy link
Contributor

@idreeskhan idreeskhan Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go one step further and say it should necessarily be the A' schema, even if it's nullable/has default, and cases where a field is missing should fail. Semantically, it's still a difference between the two datasets. IIRC this is the current functionality. It's unclear to me if/where this behaviour is retained

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigDiffy API of this lib is not file aware. It only works in terms of in-memory records and can't make any assumption on writer schema.

It is up to the users creating the SCollection to read using the correct schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for the diffAvro API, the reader schema used is the one from the generated class.

It is totally possible that underlying files are using a different schema, ratatool-diffy will miss those.

Copy link
Contributor

@clairemcginty clairemcginty Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigDiffy API of this lib is not file aware.

cc @RustedBones, AFAIK, BigDiffy is file-aware when run through the CLI, which invokes BigDiffy#run:

val schema = new AvroSampler(rhs, conf = Some(sc.options))
.sample(1, head = true)
.head
.getSchema
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
val lhsSCollection = sc.avroFile(lhs, schema)
val rhsSCollection = sc.avroFile(rhs, schema)
BigDiffy
.diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)

but even then, it looks like it selects the schema associated with the RHS and uses that for both resulting SCollections. So maybe we could add schema validation there (ensure that RHS schema is equal to, or a superset of, the LHS schema)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an extra check that will prefer backward compatible reader schema.
When schemas are different, but both forward and backward compatible, will print a warning

diff(Option(x), Option(y), "")
override def apply(x: T, y: T): Seq[Delta] = (x, y) match {
case (null, null) => Seq.empty
case (_, null) => Seq(Delta("", Some(x), None, UnknownDelta))
case (null, _) => Seq(Delta("", None, Some(y), UnknownDelta))
case _ if x.getSchema != y.getSchema => Seq(Delta("", Some(x), Some(y), UnknownDelta))
case _ => diff(x, y, x.getSchema, "")
}

def isAvroRecordType(schema: Schema): Boolean =
Schema.Type.RECORD.equals(schema.getType) ||
(Schema.Type.UNION.equals(schema.getType) &&
schema.getTypes.asScala.map(_.getType).contains(Schema.Type.RECORD))

private def diff(x: Option[GenericRecord], y: Option[GenericRecord], root: String): Seq[Delta] = {
// If a y exists we assume it has the superset of all fields, since x must be backwards
// compatible with it based on the SchemaValidator check in apply()
val schemaFields = (x, y) match {
case (Some(xVal), None) => xVal.getSchema.getFields.asScala.toList
case (_, Some(yVal)) => yVal.getSchema.getFields.asScala.toList
case _ => List()
}
private def isRecord(schema: Schema): Boolean = schema.getType match {
case Schema.Type.RECORD => true
case Schema.Type.UNION => schema.getTypes.asScala.map(_.getType).contains(Schema.Type.RECORD)
case _ => false
}

schemaFields
.flatMap { f =>
val name = f.name()
val fullName = if (root.isEmpty) name else root + "." + name
getRawType(f.schema()).getType match {
case Schema.Type.RECORD =>
val a = x.flatMap(r => Option(r.get(name).asInstanceOf[GenericRecord]))
val b = y.flatMap(r => Option(r.get(name).asInstanceOf[GenericRecord]))
(a, b) match {
case (None, None) => Nil
case (Some(_), None) => Seq(Delta(fullName, a, None, UnknownDelta))
case (None, Some(_)) => Seq(Delta(fullName, None, b, UnknownDelta))
case (Some(_), Some(_)) => diff(a, b, fullName)
}
case Schema.Type.ARRAY if unordered.contains(fullName) =>
if (
unorderedFieldKeys.contains(fullName)
&& isAvroRecordType(f.schema().getElementType)
) {
val l = x
.flatMap(outer =>
Option(outer.get(name).asInstanceOf[java.util.List[GenericRecord]].asScala.toList)
)
.getOrElse(List())
.flatMap(inner =>
Try(inner.get(unorderedFieldKeys(fullName))).toOption.map(k => (k, inner))
)
.toMap
val r = y
.flatMap(outer =>
Option(outer.get(name).asInstanceOf[java.util.List[GenericRecord]].asScala.toList)
)
.getOrElse(List())
.flatMap(inner =>
Try(inner.get(unorderedFieldKeys(fullName))).toOption.map(k => (k, inner))
)
.toMap
(l.keySet ++ r.keySet).flatMap(k => diff(l.get(k), r.get(k), fullName)).toList
} else {
val a = x
.flatMap(r => Option(r.get(name).asInstanceOf[java.util.List[GenericRecord]]))
.map(sortList)
val b = y
.flatMap(r => Option(r.get(name).asInstanceOf[java.util.List[GenericRecord]]))
.map(sortList)
if (a == b) {
Nil
} else {
Seq(Delta(fullName, a, b, delta(a.orNull, b.orNull)))
}
}
case _ =>
val a = x.flatMap(r => Option(r.get(name)))
val b = y.flatMap(r => Option(r.get(name)))
if (a == b) Nil else Seq(Delta(fullName, a, b, delta(a.orNull, b.orNull)))
}
}
.filter(d => !ignore.contains(d.field))
private def isNumericType(`type`: Schema.Type): Boolean = `type` match {
case Schema.Type.INT | Schema.Type.LONG | Schema.Type.FLOAT | Schema.Type.DOUBLE => true
case _ => false
}

private def getRawType(schema: Schema): Schema = {
schema.getType match {
private def numericValue(value: AnyRef): Double = value match {
case i: java.lang.Integer => i.toDouble
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why .toDouble here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because numericDelta only supports double

case l: java.lang.Long => l.toDouble
case f: java.lang.Float => f.toDouble
case d: java.lang.Double => d
case _ => throw new IllegalArgumentException(s"Unsupported numeric type: ${value.getClass}")
}

private def diff(x: AnyRef, y: AnyRef, schema: Schema, field: String): Seq[Delta] = {
benkonz marked this conversation as resolved.
Show resolved Hide resolved
val deltas = schema.getType match {
case Schema.Type.UNION =>
val types = schema.getTypes
if (types.size == 2) {
if (types.get(0).getType == Schema.Type.NULL) {
types.get(1)
} else if (types.get(1).getType == Schema.Type.NULL) {
// incorrect use of Avro "nullable" but happens
types.get(0)
} else {
schema
// union, must resolve to same type
val data = SpecificData.get()
val xTypeIndex = data.resolveUnion(schema, x)
val yTypeIndex = data.resolveUnion(schema, y)
if (xTypeIndex != yTypeIndex) {
// Use Option as x or y can be null
Seq(Delta(field, Option(x), Option(y), UnknownDelta))
} else {
// same fields, refined schema
val fieldSchema = schema.getTypes.get(xTypeIndex)
diff(x, y, fieldSchema, field)
}

case Schema.Type.RECORD =>
// record, compare all fields
val a = x.asInstanceOf[IndexedRecord]
val b = y.asInstanceOf[IndexedRecord]
Comment on lines +89 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this! Seems more resource-efficient. Is there any case in which this cast could not work, though, so to add in some catch statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if schema type is a record, IndexedRecord is the least powerful abstraction we need to check equality. We were previously using GenericRecord that extends IndexedRecord, but equality can be done on field order.

for {
f <- schema.getFields.asScala.toSeq
pos = f.pos()
name = f.name()
fullName = if (field.isEmpty) name else field + "." + name
delta <- diff(a.get(pos), b.get(pos), f.schema(), fullName)
} yield delta

case Schema.Type.ARRAY
if unorderedFieldKeys.contains(field) && isRecord(schema.getElementType) =>
// keyed array, compare like Map[String, Record]
val keyField = unorderedFieldKeys(field)
val as =
x.asInstanceOf[java.util.List[GenericRecord]].asScala.map(r => r.get(keyField) -> r).toMap
val bs =
y.asInstanceOf[java.util.List[GenericRecord]].asScala.map(r => r.get(keyField) -> r).toMap

for {
k <- (as.keySet ++ bs.keySet).toSeq
elementField = field + s"[$k]"
delta <- (as.get(k), bs.get(k)) match {
case (Some(a), Some(b)) => diff(a, b, schema.getElementType, field)
case (a, b) => Seq(Delta(field, a, b, UnknownDelta))
}
} yield delta.copy(field = delta.field.replaceFirst(field, elementField))

case Schema.Type.ARRAY =>
// array, (un)ordered comparison
val xs = x.asInstanceOf[java.util.List[AnyRef]]
val ys = y.asInstanceOf[java.util.List[AnyRef]]
val (as, bs) = if (unordered.contains(field)) {
// ordered comparison
(sortList(xs).asScala, sortList(ys).asScala)
} else {
schema
// unordered
(xs.asScala, ys.asScala)
}
case _ => schema

val delta = if (as.size != bs.size) {
Some(UnknownDelta)
} else if (isNumericType(schema.getElementType.getType) && as != bs) {
Some(VectorDelta(vectorDelta(as.map(numericValue).toSeq, bs.map(numericValue).toSeq)))
} else if (as != bs) {
as.zip(bs)
.find { case (a, b) =>
a != b && diff(a, b, schema.getElementType, field).nonEmpty
}
.map(_ => UnknownDelta)
} else {
None
}
delta.map(d => Delta(field, Some(x), Some(y), d)).toSeq

case Schema.Type.MAP =>
// map, compare key set and values
val as = x.asInstanceOf[java.util.Map[CharSequence, AnyRef]].asScala.map { case (k, v) =>
k.toString -> v
}
val bs = y.asInstanceOf[java.util.Map[CharSequence, AnyRef]].asScala.map { case (k, v) =>
k.toString -> v
}

for {
k <- (as.keySet ++ bs.keySet).toSeq
elementField = field + s"[$k]"
delta <- (as.get(k), bs.get(k)) match {
case (Some(a), Some(b)) => diff(a, b, schema.getValueType, field)
case (a, b) => Seq(Delta(field, a, b, UnknownDelta))
}
} yield delta.copy(field = delta.field.replaceFirst(field, elementField))

case Schema.Type.STRING =>
// string, convert to java String for equality check
val a = x.asInstanceOf[CharSequence].toString
val b = y.asInstanceOf[CharSequence].toString
val delta = if (a == b) None else Some(StringDelta(stringDelta(a, b)))
delta.map(d => Delta(field, Some(x), Some(y), d)).toSeq

case t if isNumericType(t) =>
// numeric, convert to Double for equality check
val a = numericValue(x)
val b = numericValue(y)
val delta = if (a == b) None else Some(NumericDelta(numericDelta(a, b)))
delta.map(d => Delta(field, Some(x), Some(y), d)).toSeq

case _ =>
// other case rely on object equality
val delta = if (x == y) None else Some(UnknownDelta)
delta.map(d => Delta(field, Some(x), Some(y), d)).toSeq
}
}

deltas.filterNot(d => ignore.contains(d.field))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import com.spotify.scio.io.ClosedTap
import com.spotify.scio.parquet.avro._
import com.spotify.scio.values.SCollection
import com.twitter.algebird._
import org.apache.avro.{Schema, SchemaCompatibility}
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -249,14 +251,12 @@ object BigDiffy extends Command with Serializable {
(key, (Nil, diffType))
}
}
.map { x =>
x._2._2 match {
case DiffType.SAME => accSame.inc()
case DiffType.DIFFERENT => accDiff.inc()
case DiffType.MISSING_LHS => accMissingLhs.inc()
case DiffType.MISSING_RHS => accMissingRhs.inc()
}
x
.tap {
case (_, (_, DiffType.SAME)) => accSame.inc()
case (_, (_, DiffType.DIFFERENT)) => accDiff.inc()
case (_, (_, DiffType.MISSING_LHS)) => accMissingLhs.inc()
case (_, (_, DiffType.MISSING_RHS)) => accMissingRhs.inc()
case _ =>
}
}

Expand Down Expand Up @@ -608,6 +608,9 @@ object BigDiffy extends Command with Serializable {
sys.exit(1)
}

private def avroFileSchema(path: String, options: PipelineOptions): Schema =
new AvroSampler(path, conf = Some(options)).sample(1, head = true).head.getSchema

private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => MultiKey = {
@tailrec
def get(xs: Array[String], i: Int, r: GenericRecord): String =
Expand Down Expand Up @@ -745,13 +748,30 @@ object BigDiffy extends Command with Serializable {
val result = inputMode match {
case "avro" =>
if (rowRestriction.isDefined) {
throw new IllegalArgumentException(s"rowRestriction cannot be passed for avro inputs")
throw new IllegalArgumentException("rowRestriction cannot be passed for avro inputs")
}

val lhsSchema = avroFileSchema(lhs, sc.options)
val rhsSchema = avroFileSchema(rhs, sc.options)

val lhsReader = SchemaCompatibility.checkReaderWriterCompatibility(lhsSchema, rhsSchema)
val rhsReader = SchemaCompatibility.checkReaderWriterCompatibility(rhsSchema, lhsSchema)

import SchemaCompatibility.SchemaCompatibilityType._
val schema = (lhsReader.getType, rhsReader.getType) match {
case (COMPATIBLE, COMPATIBLE) =>
if (lhsSchema != rhsSchema) {
logger.warn("Avro schemas are compatible, but not equal. Using schema from {}", rhs)
}
rhsSchema
case (COMPATIBLE, INCOMPATIBLE) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in underlying functionality, IMO We should also warn in these cases rather than proceed transparently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I still think we enforce RHS backwards compatibility unless otherwise shown to be necessary, but if we are changing functionality/flexibility then we need to do so in a way that is transparent to users.

I'll leave the actual decision here down to current members of the owning team

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted to previous behavior using the SchemaValidatorBuilder that thows a SchemaValidationException with detailed error in case of incompatibility.

lhsSchema
case (INCOMPATIBLE, COMPATIBLE) =>
rhsSchema
case _ =>
throw new IllegalArgumentException("Avro schemas are incompatible")
}

val schema = new AvroSampler(rhs, conf = Some(sc.options))
.sample(1, head = true)
.head
.getSchema
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(schema)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)
val lhsSCollection = sc.avroFile(lhs, schema)
Expand All @@ -760,7 +780,7 @@ object BigDiffy extends Command with Serializable {
.diff[GenericRecord](lhsSCollection, rhsSCollection, diffy, avroKeyFn(keys), ignoreNan)
case "parquet" =>
if (rowRestriction.isDefined) {
throw new IllegalArgumentException(s"rowRestriction cannot be passed for Parquet inputs")
throw new IllegalArgumentException("rowRestriction cannot be passed for Parquet inputs")
}
val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
val diffy = new AvroDiffy[GenericRecord](ignore, unordered, unorderedKeys)(
Expand Down
Loading