Skip to content

Commit

Permalink
Omit parquet field for a schema with no nested fields
Browse files Browse the repository at this point in the history
Snowplow users commonly use schemas with no nested fields. Previously,
we were creating a string column and loading the string field `{}`. But
there is no benefit to loading this redundant data.

By omitting a column for these schemas, it means we support schema
evolution if the user ever adds a nested field to the empty schema.

For empty schemas with `additionalProperties: true` we retain the old
behaviour of loading the original JSON as a string.
  • Loading branch information
istreeter committed Apr 20, 2024
1 parent 8b230ae commit 39d1b71
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ object AtomicFields {
*
* Does not include fields added by the loader, e.g. `load_tstamp`
*/
val static: List[Field] =
List(
val static: Vector[Field] =
Vector(
Field("app_id", Type.String, Nullable),
Field("platform", Type.String, Nullable),
Field("etl_tstamp", Type.Timestamp, Nullable),
Expand Down Expand Up @@ -154,6 +154,6 @@ object AtomicFields {
Field("true_tstamp", Type.Timestamp, Nullable)
)

val withLoadTstamp: List[Field] =
val withLoadTstamp: Vector[Field] =
static :+ Field("load_tstamp", Type.Timestamp, Required)
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ object NonAtomicFields {
// We can't do it sooner based on a result of `fetchSchemasWithSameModel` because we can't have 'holes' in Iglu schema family when building typed entities.
// Otherwise we may end up with invalid and incompatible merged schema model.
// Here `TypedTabledEntity` is already properly created using contiguous series of schemas, so we can try to skip some sub-versions.
.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
.map {
_.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
}
}
.leftMap(ColumnFailure(tabledEntity, subVersions, _))
.value
}
.map { eithers =>
val (failures, good) = eithers.separate
Result(good, failures.toList)
Result(good.flatten, failures.toList)
}

private def filterSubVersions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.{EitherT, NonEmptyList}
import cats.data.{EitherT, NonEmptyVector}
import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
Expand Down Expand Up @@ -35,15 +35,15 @@ private[transform] object SchemaProvider {
def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] =
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] =
for {
schemaKeys <- EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas)
.map(_.schemas.toVector)
schemaKeys <- EitherT.rightT[F, FailureDetails.LoaderIgluError](schemaKeys.filter(_ < schemaKey))
topSchema <- getSchema(resolver, schemaKey)
lowerSchemas <- schemaKeys.filter(_ < schemaKey).traverse(getSchema(resolver, _))
} yield NonEmptyList(topSchema, lowerSchemas)
} yield NonEmptyVector(topSchema, lowerSchemas)

private def resolverFetchBadRow(
vendor: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyList
import cats.data.NonEmptyVector
import cats.implicits._
import io.circe.syntax._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema}
Expand Down Expand Up @@ -56,38 +56,43 @@ object TypedTabledEntity {
private[transform] def build(
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion],
schemas: NonEmptyList[SelfDescribingSchema[Schema]]
): TypedTabledEntity = {
schemas: NonEmptyVector[SelfDescribingSchema[Schema]]
): Option[TypedTabledEntity] =
// Schemas need to be ordered by key to merge in correct order.
val NonEmptyList(root, tail) = schemas.sorted
val tte = tail
.foldLeft(initColumnGroup(tabledEntity, root)) { case (columnGroup, selfDescribingSchema) =>
val field = fieldFromSchema(tabledEntity, selfDescribingSchema.schema)
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Migrations.mergeSchemas(columnGroup.mergedField, field) match {
case Left(_) =>
if (subVersions.contains(subversion)) {
val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode())
// typedField always has a single element in matchingKeys
val recoverPoint = schemaKey.version.asString.replaceAll("-", "_")
val newName = s"${field.name}_recovered_${recoverPoint}_$hash"
columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries)
} else {
// do not create a recovered column if that type were not in the batch
columnGroup
schemas.sorted.toVector
.flatMap { case sds =>
fieldFromSchema(tabledEntity, sds.schema).map((_, sds))
}
.toNev
.map { nev =>
val (rootField, rootSchema) = nev.head
val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil)
nev.tail
.foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) =>
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Migrations.mergeSchemas(columnGroup.mergedField, field) match {
case Left(_) =>
if (subVersions.contains(subversion)) {
val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode())
// typedField always has a single element in matchingKeys
val recoverPoint = schemaKey.version.asString.replaceAll("-", "_")
val newName = s"${field.name}_recovered_${recoverPoint}_$hash"
columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries)
} else {
// do not create a recovered column if that type were not in the batch
columnGroup
}
case Right(mergedField) =>
columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion)
}
case Right(mergedField) =>
columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion)
}
}
}
.map { tte =>
tte.copy(recoveries = tte.recoveries.reverse)
}
tte.copy(recoveries = tte.recoveries.reverse)
}

private def initColumnGroup(tabledEntity: TabledEntity, root: SelfDescribingSchema[Schema]): TypedTabledEntity =
TypedTabledEntity(tabledEntity, fieldFromSchema(tabledEntity, root.schema), Set(keyToSubVersion(root.self.schemaKey)), Nil)

private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Field = {
private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Option[Field] = {
val sdkEntityType = tabledEntity.entityType match {
case TabledEntity.UnstructEvent => SdkData.UnstructEvent
case TabledEntity.Context => SdkData.Contexts(SdkData.CustomContexts)
Expand All @@ -96,33 +101,30 @@ object TypedTabledEntity {

val field = tabledEntity.entityType match {
case TabledEntity.UnstructEvent =>
Field.normalize {
Field.build(fieldName, schema, enforceValuePresence = false)
}
Field.build(fieldName, schema, enforceValuePresence = false).map(Field.normalize(_))
case TabledEntity.Context =>
// Must normalize first and add the schema key field after. To avoid unlikely weird issues
// with similar existing keys.
addSchemaVersionKey {
Field.normalize {
Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
}
}
Field
.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
.map(Field.normalize(_))
.map(addSchemaVersionKey(_))
}

// Accessors are meaningless for a schema's top-level field
field.copy(accessors = Set.empty)
field.map(_.copy(accessors = Set.empty))
}

private def keyToSubVersion(key: SchemaKey): SchemaSubVersion = (key.version.revision, key.version.addition)

private def addSchemaVersionKey(field: Field): Field = {
val fieldType = field.fieldType match {
case arr @ Type.Array(struct @ Type.Struct(subFields), _) =>
val fixedFields = // Our special key takes priority over a key of the same name in the schema
Field("_schema_version", Type.String, Type.Nullability.Required) +: subFields.filter(
_.name =!= "_schema_version"
)
arr.copy(element = struct.copy(fields = fixedFields))
val head = Field("_schema_version", Type.String, Type.Nullability.Required)
val tail = subFields.filter( // Our special key takes priority over a key of the same name in the schema
_.name =!= "_schema_version"
)
arr.copy(element = struct.copy(fields = NonEmptyVector(head, tail)))
case other =>
// This is OK. It must be a weird schema, whose root type is not an object.
// Unlikely but allowed according to our rules.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.snowplowanalytics.snowplow.loaders.transform

import cats.effect.IO
import cats.data.NonEmptyVector
import org.specs2.Specification
import cats.effect.testing.specs2.CatsEffect
import com.snowplowanalytics.iglu.client.Resolver
Expand Down Expand Up @@ -55,7 +56,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required)
)
)
Expand Down Expand Up @@ -88,7 +89,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Expand Down Expand Up @@ -123,7 +124,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Field("col_b", Type.String, Nullable)
Expand Down Expand Up @@ -158,7 +159,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("_schema_version", Type.String, Required),
Field("col_a", Type.String, Required)
)
Expand Down Expand Up @@ -194,7 +195,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("_schema_version", Type.String, Required),
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Expand Down Expand Up @@ -233,7 +234,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {
val expected = {

val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("_schema_version", Type.String, Required),
Field("col_a", Type.String, Required),
Field("col_c", Type.String, Nullable),
Expand Down Expand Up @@ -289,7 +290,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required)
)
)
Expand Down Expand Up @@ -322,19 +323,19 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required)
)
)

val recoveryStruct1 = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.Double, Required)
)
)

val recoveryStruct2 = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.Boolean, Required)
)
)
Expand Down Expand Up @@ -372,20 +373,20 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect {

val expected = {
val expectedStruct = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.String, Required),
Field("col_b", Type.Long, Nullable)
)
)

val recoveryStruct1 = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.Double, Required)
)
)

val recoveryStruct2 = Type.Struct(
List(
NonEmptyVector.of(
Field("col_a", Type.Boolean, Required)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyVector
import io.circe.{Json, JsonObject}

import java.time.{Instant, LocalDate}
Expand All @@ -27,7 +28,7 @@ object TestCaster extends Caster[Json] {
override def timestampValue(v: Instant): Json = Json.fromString(v.toString)
override def dateValue(v: LocalDate): Json = Json.fromString(v.toString)
override def arrayValue(vs: List[Json]): Json = Json.fromValues(vs)
override def structValue(vs: List[Caster.NamedValue[Json]]): Json =
Json.fromJsonObject(JsonObject.fromIterable(vs.map(nv => nv.name -> nv.value)))
override def structValue(vs: NonEmptyVector[Caster.NamedValue[Json]]): Json =
Json.fromJsonObject(JsonObject.fromIterable(vs.toVector.map(nv => nv.name -> nv.value)))

}
Loading

0 comments on commit 39d1b71

Please sign in to comment.