Skip to content

Commit

Permalink
Omit parquet field for a schema with no nested fields
Browse files Browse the repository at this point in the history
Snowplow users commonly use schemas with no nested fields. Previously,
we were creating a string column and loading the string field `{}`. But
there is no benefit to loading this redundant data.

By omitting a column for these schemas, it means we support schema
evolution if the user ever adds a nested field to the empty schema.

For empty schemas with `additionalProperties: true` we retain the old
behaviour of loading the original JSON as a string.
  • Loading branch information
istreeter committed Apr 20, 2024
1 parent 8b230ae commit 19a1f06
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 103 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ lazy val loadersCommon: Project = project
.settings(BuildSettings.publishSettings)
.settings(BuildSettings.mimaSettings)
.settings(BuildSettings.docsSettings)
.settings(BuildSettings.igluTestSettings)
.settings(
previewFixedPort := Some(9999),
Preprocess / preprocessVars := Map("VERSION" -> version.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ object AtomicFields {
*
* Does not include fields added by the loader, e.g. `load_tstamp`
*/
val static: List[Field] =
List(
val static: Vector[Field] =
Vector(
Field("app_id", Type.String, Nullable),
Field("platform", Type.String, Nullable),
Field("etl_tstamp", Type.Timestamp, Nullable),
Expand Down Expand Up @@ -154,6 +154,6 @@ object AtomicFields {
Field("true_tstamp", Type.Timestamp, Nullable)
)

val withLoadTstamp: List[Field] =
val withLoadTstamp: Vector[Field] =
static :+ Field("load_tstamp", Type.Timestamp, Required)
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ object NonAtomicFields {
// We can't do it sooner based on a result of `fetchSchemasWithSameModel` because we can't have 'holes' in Iglu schema family when building typed entities.
// Otherwise we may end up with invalid and incompatible merged schema model.
// Here `TypedTabledEntity` is already properly created using contiguous series of schemas, so we can try to skip some sub-versions.
.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
.map {
_.map { typedTabledEntity =>
val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions)
typedTabledEntity.copy(mergedVersions = filteredSubVersions)
}
}
.leftMap(ColumnFailure(tabledEntity, subVersions, _))
.value
}
.map { eithers =>
val (failures, good) = eithers.separate
Result(good, failures.toList)
Result(good.flatten, failures.toList)
}

private def filterSubVersions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.{EitherT, NonEmptyList}
import cats.data.{EitherT, NonEmptyVector}
import cats.effect.Sync
import cats.implicits._
import com.snowplowanalytics.iglu.client.{ClientError, Resolver}
Expand Down Expand Up @@ -35,15 +35,15 @@ private[transform] object SchemaProvider {
def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
schemaKey: SchemaKey
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] =
): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] =
for {
schemaKeys <- EitherT(resolver.listSchemasLike(schemaKey))
.leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
.map(_.schemas)
.map(_.schemas.toVector)
schemaKeys <- EitherT.rightT[F, FailureDetails.LoaderIgluError](schemaKeys.filter(_ < schemaKey))
topSchema <- getSchema(resolver, schemaKey)
lowerSchemas <- schemaKeys.filter(_ < schemaKey).traverse(getSchema(resolver, _))
} yield NonEmptyList(topSchema, lowerSchemas)
} yield NonEmptyVector(topSchema, lowerSchemas)

private def resolverFetchBadRow(
vendor: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.loaders.transform

import cats.data.NonEmptyList
import cats.data.NonEmptyVector
import cats.implicits._
import io.circe.syntax._
import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema}
Expand Down Expand Up @@ -56,38 +56,43 @@ object TypedTabledEntity {
private[transform] def build(
tabledEntity: TabledEntity,
subVersions: Set[SchemaSubVersion],
schemas: NonEmptyList[SelfDescribingSchema[Schema]]
): TypedTabledEntity = {
schemas: NonEmptyVector[SelfDescribingSchema[Schema]]
): Option[TypedTabledEntity] =
// Schemas need to be ordered by key to merge in correct order.
val NonEmptyList(root, tail) = schemas.sorted
val tte = tail
.foldLeft(initColumnGroup(tabledEntity, root)) { case (columnGroup, selfDescribingSchema) =>
val field = fieldFromSchema(tabledEntity, selfDescribingSchema.schema)
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Migrations.mergeSchemas(columnGroup.mergedField, field) match {
case Left(_) =>
if (subVersions.contains(subversion)) {
val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode())
// typedField always has a single element in matchingKeys
val recoverPoint = schemaKey.version.asString.replaceAll("-", "_")
val newName = s"${field.name}_recovered_${recoverPoint}_$hash"
columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries)
} else {
// do not create a recovered column if that type were not in the batch
columnGroup
schemas.sorted.toVector
.flatMap { case sds =>
fieldFromSchema(tabledEntity, sds.schema).map((_, sds))
}
.toNev
.map { nev =>
val (rootField, rootSchema) = nev.head
val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil)
nev.tail
.foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) =>
val schemaKey = selfDescribingSchema.self.schemaKey
val subversion = keyToSubVersion(schemaKey)
Migrations.mergeSchemas(columnGroup.mergedField, field) match {
case Left(_) =>
if (subVersions.contains(subversion)) {
val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode())
// typedField always has a single element in matchingKeys
val recoverPoint = schemaKey.version.asString.replaceAll("-", "_")
val newName = s"${field.name}_recovered_${recoverPoint}_$hash"
columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries)
} else {
// do not create a recovered column if that type were not in the batch
columnGroup
}
case Right(mergedField) =>
columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion)
}
case Right(mergedField) =>
columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion)
}
}
}
.map { tte =>
tte.copy(recoveries = tte.recoveries.reverse)
}
tte.copy(recoveries = tte.recoveries.reverse)
}

private def initColumnGroup(tabledEntity: TabledEntity, root: SelfDescribingSchema[Schema]): TypedTabledEntity =
TypedTabledEntity(tabledEntity, fieldFromSchema(tabledEntity, root.schema), Set(keyToSubVersion(root.self.schemaKey)), Nil)

private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Field = {
private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Option[Field] = {
val sdkEntityType = tabledEntity.entityType match {
case TabledEntity.UnstructEvent => SdkData.UnstructEvent
case TabledEntity.Context => SdkData.Contexts(SdkData.CustomContexts)
Expand All @@ -96,33 +101,30 @@ object TypedTabledEntity {

val field = tabledEntity.entityType match {
case TabledEntity.UnstructEvent =>
Field.normalize {
Field.build(fieldName, schema, enforceValuePresence = false)
}
Field.build(fieldName, schema, enforceValuePresence = false).map(Field.normalize(_))
case TabledEntity.Context =>
// Must normalize first and add the schema key field after. To avoid unlikely weird issues
// with similar existing keys.
addSchemaVersionKey {
Field.normalize {
Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
}
}
Field
.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable)
.map(Field.normalize(_))
.map(addSchemaVersionKey(_))
}

// Accessors are meaningless for a schema's top-level field
field.copy(accessors = Set.empty)
field.map(_.copy(accessors = Set.empty))
}

private def keyToSubVersion(key: SchemaKey): SchemaSubVersion = (key.version.revision, key.version.addition)

private def addSchemaVersionKey(field: Field): Field = {
val fieldType = field.fieldType match {
case arr @ Type.Array(struct @ Type.Struct(subFields), _) =>
val fixedFields = // Our special key takes priority over a key of the same name in the schema
Field("_schema_version", Type.String, Type.Nullability.Required) +: subFields.filter(
_.name =!= "_schema_version"
)
arr.copy(element = struct.copy(fields = fixedFields))
val head = Field("_schema_version", Type.String, Type.Nullability.Required)
val tail = subFields.filter( // Our special key takes priority over a key of the same name in the schema
_.name =!= "_schema_version"
)
arr.copy(element = struct.copy(fields = NonEmptyVector(head, tail)))
case other =>
// This is OK. It must be a weird schema, whose root type is not an object.
// Unlikely but allowed according to our rules.
Expand Down
Loading

0 comments on commit 19a1f06

Please sign in to comment.