From 973d66259245896ce7af017f264c0d7f89fe310d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 8 May 2024 15:57:57 +0100 Subject: [PATCH] Omit parquet field for a schema with no nested fields (#74) Snowplow users commonly use schemas with no nested fields. Previously, we were creating a string column and loading the string field `{}`. But there is no benefit to loading this redundant data. By omitting a column for these schemas, it means we support schema evolution if the user ever adds a nested field to the empty schema. For empty schemas with `additionalProperties: true` we retain the old behaviour of loading the original JSON as a string. --- build.sbt | 1 + .../loaders/transform/AtomicFields.scala | 6 +- .../loaders/transform/NonAtomicFields.scala | 10 +- .../loaders/transform/SchemaProvider.scala | 8 +- .../loaders/transform/TypedTabledEntity.scala | 88 ++++++------ .../transform/NonAtomicFieldsSpec.scala | 125 ++++++++++++++++-- .../transform/SchemaProviderSpec.scala | 20 +-- .../transform/TestCaster.scala | 11 +- .../transform/TransformStructuredSpec.scala | 53 ++++++-- .../transform/TransformUnstructuredSpec.scala | 25 ++-- project/BuildSettings.scala | 11 ++ project/Dependencies.scala | 2 +- 12 files changed, 257 insertions(+), 103 deletions(-) diff --git a/build.sbt b/build.sbt index a2248b9..daa02e1 100644 --- a/build.sbt +++ b/build.sbt @@ -125,6 +125,7 @@ lazy val loadersCommon: Project = project .settings(BuildSettings.publishSettings) .settings(BuildSettings.mimaSettings) .settings(BuildSettings.docsSettings) + .settings(BuildSettings.igluTestSettings) .settings( previewFixedPort := Some(9999), Preprocess / preprocessVars := Map("VERSION" -> version.value) diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/AtomicFields.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/AtomicFields.scala index cdc398e..9eac587 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/AtomicFields.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/AtomicFields.scala @@ -22,8 +22,8 @@ object AtomicFields { * * Does not include fields added by the loader, e.g. `load_tstamp` */ - val static: List[Field] = - List( + val static: Vector[Field] = + Vector( Field("app_id", Type.String, Nullable), Field("platform", Type.String, Nullable), Field("etl_tstamp", Type.Timestamp, Nullable), @@ -154,6 +154,6 @@ object AtomicFields { Field("true_tstamp", Type.Timestamp, Nullable) ) - val withLoadTstamp: List[Field] = + val withLoadTstamp: Vector[Field] = static :+ Field("load_tstamp", Type.Timestamp, Required) } diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/NonAtomicFields.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/NonAtomicFields.scala index 8ac3115..1226bda 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/NonAtomicFields.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/NonAtomicFields.scala @@ -69,16 +69,18 @@ object NonAtomicFields { // We can't do it sooner based on a result of `fetchSchemasWithSameModel` because we can't have 'holes' in Iglu schema family when building typed entities. // Otherwise we may end up with invalid and incompatible merged schema model. // Here `TypedTabledEntity` is already properly created using contiguous series of schemas, so we can try to skip some sub-versions. - .map { typedTabledEntity => - val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions) - typedTabledEntity.copy(mergedVersions = filteredSubVersions) + .map { + _.map { typedTabledEntity => + val filteredSubVersions = filterSubVersions(filterCriteria, typedTabledEntity.tabledEntity, typedTabledEntity.mergedVersions) + typedTabledEntity.copy(mergedVersions = filteredSubVersions) + } } .leftMap(ColumnFailure(tabledEntity, subVersions, _)) .value } .map { eithers => val (failures, good) = eithers.separate - Result(good, failures.toList) + Result(good.flatten, failures.toList) } private def filterSubVersions( diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala index 31793a6..d7cac13 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/SchemaProvider.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.loaders.transform -import cats.data.{EitherT, NonEmptyList} +import cats.data.{EitherT, NonEmptyVector} import cats.effect.Sync import cats.implicits._ import com.snowplowanalytics.iglu.client.{ClientError, Resolver} @@ -35,15 +35,15 @@ private[transform] object SchemaProvider { def fetchSchemasWithSameModel[F[_]: Sync: RegistryLookup]( resolver: Resolver[F], schemaKey: SchemaKey - ): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SelfDescribingSchema[Schema]]] = + ): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyVector[SelfDescribingSchema[Schema]]] = for { schemaKeys <- EitherT(resolver.listSchemasLike(schemaKey)) .leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model)) - .map(_.schemas) + .map(_.schemas.toVector) schemaKeys <- EitherT.rightT[F, FailureDetails.LoaderIgluError](schemaKeys.filter(_ < schemaKey)) topSchema <- getSchema(resolver, schemaKey) lowerSchemas <- schemaKeys.filter(_ < schemaKey).traverse(getSchema(resolver, _)) - } yield NonEmptyList(topSchema, lowerSchemas) + } yield NonEmptyVector(topSchema, lowerSchemas) private def resolverFetchBadRow( vendor: String, diff --git a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala index 8b5d0a6..a2cd9bc 100644 --- a/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala +++ b/modules/loaders-common/src/main/scala/com/snowplowanalytics/snowplow/loaders/transform/TypedTabledEntity.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.loaders.transform -import cats.data.NonEmptyList +import cats.data.NonEmptyVector import cats.implicits._ import io.circe.syntax._ import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema} @@ -56,38 +56,43 @@ object TypedTabledEntity { private[transform] def build( tabledEntity: TabledEntity, subVersions: Set[SchemaSubVersion], - schemas: NonEmptyList[SelfDescribingSchema[Schema]] - ): TypedTabledEntity = { + schemas: NonEmptyVector[SelfDescribingSchema[Schema]] + ): Option[TypedTabledEntity] = // Schemas need to be ordered by key to merge in correct order. - val NonEmptyList(root, tail) = schemas.sorted - val tte = tail - .foldLeft(initColumnGroup(tabledEntity, root)) { case (columnGroup, selfDescribingSchema) => - val field = fieldFromSchema(tabledEntity, selfDescribingSchema.schema) - val schemaKey = selfDescribingSchema.self.schemaKey - val subversion = keyToSubVersion(schemaKey) - Migrations.mergeSchemas(columnGroup.mergedField, field) match { - case Left(_) => - if (subVersions.contains(subversion)) { - val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode()) - // typedField always has a single element in matchingKeys - val recoverPoint = schemaKey.version.asString.replaceAll("-", "_") - val newName = s"${field.name}_recovered_${recoverPoint}_$hash" - columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries) - } else { - // do not create a recovered column if that type were not in the batch - columnGroup + schemas.sorted.toVector + .flatMap { case sds => + fieldFromSchema(tabledEntity, sds.schema).map((_, sds)) + } + .toNev + .map { nev => + val (rootField, rootSchema) = nev.head + val tte = TypedTabledEntity(tabledEntity, rootField, Set(keyToSubVersion(rootSchema.self.schemaKey)), Nil) + nev.tail + .foldLeft(tte) { case (columnGroup, (field, selfDescribingSchema)) => + val schemaKey = selfDescribingSchema.self.schemaKey + val subversion = keyToSubVersion(schemaKey) + Migrations.mergeSchemas(columnGroup.mergedField, field) match { + case Left(_) => + if (subVersions.contains(subversion)) { + val hash = "%08x".format(selfDescribingSchema.schema.asJson.noSpaces.hashCode()) + // typedField always has a single element in matchingKeys + val recoverPoint = schemaKey.version.asString.replaceAll("-", "_") + val newName = s"${field.name}_recovered_${recoverPoint}_$hash" + columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries) + } else { + // do not create a recovered column if that type were not in the batch + columnGroup + } + case Right(mergedField) => + columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion) } - case Right(mergedField) => - columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion) - } + } + } + .map { tte => + tte.copy(recoveries = tte.recoveries.reverse) } - tte.copy(recoveries = tte.recoveries.reverse) - } - - private def initColumnGroup(tabledEntity: TabledEntity, root: SelfDescribingSchema[Schema]): TypedTabledEntity = - TypedTabledEntity(tabledEntity, fieldFromSchema(tabledEntity, root.schema), Set(keyToSubVersion(root.self.schemaKey)), Nil) - private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Field = { + private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Option[Field] = { val sdkEntityType = tabledEntity.entityType match { case TabledEntity.UnstructEvent => SdkData.UnstructEvent case TabledEntity.Context => SdkData.Contexts(SdkData.CustomContexts) @@ -96,21 +101,18 @@ object TypedTabledEntity { val field = tabledEntity.entityType match { case TabledEntity.UnstructEvent => - Field.normalize { - Field.build(fieldName, schema, enforceValuePresence = false) - } + Field.build(fieldName, schema, enforceValuePresence = false).map(Field.normalize(_)) case TabledEntity.Context => // Must normalize first and add the schema key field after. To avoid unlikely weird issues // with similar existing keys. - addSchemaVersionKey { - Field.normalize { - Field.buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable) - } - } + Field + .buildRepeated(fieldName, schema, enforceItemPresence = true, Type.Nullability.Nullable) + .map(Field.normalize(_)) + .map(addSchemaVersionKey(_)) } // Accessors are meaningless for a schema's top-level field - field.copy(accessors = Set.empty) + field.map(_.copy(accessors = Set.empty)) } private def keyToSubVersion(key: SchemaKey): SchemaSubVersion = (key.version.revision, key.version.addition) @@ -118,11 +120,11 @@ object TypedTabledEntity { private def addSchemaVersionKey(field: Field): Field = { val fieldType = field.fieldType match { case arr @ Type.Array(struct @ Type.Struct(subFields), _) => - val fixedFields = // Our special key takes priority over a key of the same name in the schema - Field("_schema_version", Type.String, Type.Nullability.Required) +: subFields.filter( - _.name =!= "_schema_version" - ) - arr.copy(element = struct.copy(fields = fixedFields)) + val head = Field("_schema_version", Type.String, Type.Nullability.Required) + val tail = subFields.filter( // Our special key takes priority over a key of the same name in the schema + _.name =!= "_schema_version" + ) + arr.copy(element = struct.copy(fields = NonEmptyVector(head, tail))) case other => // This is OK. It must be a weird schema, whose root type is not an object. // Unlikely but allowed according to our rules. diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala index 510f9b3..da6c5a8 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/NonAtomicFieldsSpec.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.loaders.transform import cats.effect.IO +import cats.data.NonEmptyVector import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import com.snowplowanalytics.iglu.client.Resolver @@ -25,11 +26,15 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { return an un-merged schema if the batch uses the first schema in a series $ue1 return a merged schema if the batch uses the last schema in a series $ue2 return a merged schema if the batch uses all schemas in a series $ue3 + return nothing for the Iglu Central ad_break_end_event schema $ue4 + return a JSON field for the Iglu Central anything-a schema $ue5 when resolving for known schemas in contexts should return an un-merged schema if the batch uses the first schema in a series $c1 return a merged schema if the batch uses the last schema in a series $c2 return a merged schema if the batch uses all schemas in a series $c3 + return nothing for the Iglu Central ad_break_end_event schema $c4 + return a JSON field for the Iglu Central anything-a schema $c5 when resolving for known schema in contexts and unstruct_event should return separate entity for the context and the unstruct_event $both1 @@ -55,7 +60,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required) ) ) @@ -88,7 +93,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required), Field("col_c", Type.String, Nullable), Field("col_b", Type.String, Nullable) @@ -123,7 +128,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required), Field("col_c", Type.String, Nullable), Field("col_b", Type.String, Nullable) @@ -148,6 +153,51 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { } + def ue4 = { + + // Example of a schema which is an empty object with no additional properties + val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "com.snowplowanalytics.snowplow.media", "ad_break_end_event", 1) + + val input = Map( + tabledEntity -> Set((0, 0)) + ) + + NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) => + (failures must beEmpty) and + (fields must beEmpty) + } + + } + + def ue5 = { + + // Example of a permissive schema which permits any JSON + val tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "com.snowplowanalytics.iglu", "anything-a", 1) + + val input = Map( + tabledEntity -> Set((0, 0)) + ) + + val expected = { + val expectedType = Type.Json + val expectedField = Field("unstruct_event_com_snowplowanalytics_iglu_anything_a_1", expectedType, Nullable, Set.empty) + + TypedTabledEntity( + tabledEntity, + expectedField, + Set((0, 0)), + Nil + ) + } + + NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) => + (failures must beEmpty) and + (fields must haveSize(1)) and + (fields.head must beEqualTo(expected)) + } + + } + def c1 = { val tabledEntity = TabledEntity(TabledEntity.Context, "myvendor", "myschema", 7) @@ -158,7 +208,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("_schema_version", Type.String, Required), Field("col_a", Type.String, Required) ) @@ -194,7 +244,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("_schema_version", Type.String, Required), Field("col_a", Type.String, Required), Field("col_c", Type.String, Nullable), @@ -233,7 +283,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("_schema_version", Type.String, Required), Field("col_a", Type.String, Required), Field("col_c", Type.String, Nullable), @@ -261,6 +311,55 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { } + def c4 = { + + // Example of a schema which is an empty object with no additional properties + val tabledEntity = TabledEntity(TabledEntity.Context, "com.snowplowanalytics.snowplow.media", "ad_break_end_event", 1) + + val input = Map( + tabledEntity -> Set((0, 0)) + ) + + NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) => + (failures must beEmpty) and + (fields must beEmpty) + } + + } + + def c5 = { + + // Example of a permissive schema which permits any JSON + val tabledEntity = TabledEntity(TabledEntity.Context, "com.snowplowanalytics.iglu", "anything-a", 1) + + val input = Map( + tabledEntity -> Set((0, 0)) + ) + + val expected = { + + val expectedType = Type.Json + + val expectedArray = Type.Array(expectedType, Nullable) + + val expectedField = Field("contexts_com_snowplowanalytics_iglu_anything_a_1", expectedArray, Nullable, Set.empty) + + TypedTabledEntity( + tabledEntity, + expectedField, + Set((0, 0)), + Nil + ) + } + + NonAtomicFields.resolveTypes(embeddedResolver, input, List.empty).map { case NonAtomicFields.Result(fields, failures) => + (failures must beEmpty) and + (fields must haveSize(1)) and + (fields.head must beEqualTo(expected)) + } + + } + def both1 = { val tabledEntity1 = TabledEntity(TabledEntity.UnstructEvent, "myvendor", "myschema", 7) @@ -289,7 +388,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required) ) ) @@ -322,19 +421,19 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required) ) ) val recoveryStruct1 = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.Double, Required) ) ) val recoveryStruct2 = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.Boolean, Required) ) ) @@ -372,20 +471,20 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.String, Required), Field("col_b", Type.Long, Nullable) ) ) val recoveryStruct1 = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.Double, Required) ) ) val recoveryStruct2 = Type.Struct( - List( + NonEmptyVector.of( Field("col_a", Type.Boolean, Required) ) ) diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/SchemaProviderSpec.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/SchemaProviderSpec.scala index 81fd30b..9fa1843 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/SchemaProviderSpec.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/SchemaProviderSpec.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.loaders.transform -import cats.data.NonEmptyList +import cats.data.NonEmptyVector import cats.effect.IO import io.circe.parser.{parse => parseToJson} import scala.io.Source @@ -38,13 +38,13 @@ class SchemaProviderSpec extends Specification with CatsEffect { def e1 = { - val expected = List( + val expected = NonEmptyVector.of( SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700) ) SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey700).value.map { result => - result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] => - schemas.toList must beEqualTo(expected) + result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] => + schemas must beEqualTo(expected) } } @@ -52,15 +52,15 @@ class SchemaProviderSpec extends Specification with CatsEffect { def e2 = { - val expected = List( + val expected = Vector( SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700), SelfDescribingSchema(SchemaMap(testSchemaKey701), testSchema701), SelfDescribingSchema(SchemaMap(testSchemaKey710), testSchema710) ) SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey710).value.map { result => - result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] => - schemas.toList must containTheSameElementsAs(expected) + result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] => + schemas.toVector must containTheSameElementsAs(expected) } } @@ -68,14 +68,14 @@ class SchemaProviderSpec extends Specification with CatsEffect { def e3 = { - val expected = List( + val expected = Vector( SelfDescribingSchema(SchemaMap(testSchemaKey700), testSchema700), SelfDescribingSchema(SchemaMap(testSchemaKey701), testSchema701) ) SchemaProvider.fetchSchemasWithSameModel(embeddedResolver, testSchemaKey701).value.map { result => - result must beRight { schemas: NonEmptyList[SelfDescribingSchema[Schema]] => - schemas.toList must containTheSameElementsAs(expected) + result must beRight { schemas: NonEmptyVector[SelfDescribingSchema[Schema]] => + schemas.toVector must containTheSameElementsAs(expected) } } diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TestCaster.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TestCaster.scala index 05514d2..ad3b532 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TestCaster.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TestCaster.scala @@ -7,6 +7,7 @@ */ package com.snowplowanalytics.snowplow.loaders.transform +import cats.data.NonEmptyVector import io.circe.{Json, JsonObject} import java.time.{Instant, LocalDate} @@ -24,10 +25,10 @@ object TestCaster extends Caster[Json] { override def doubleValue(v: Double): Json = Json.fromDoubleOrNull(v) override def decimalValue(unscaled: BigInt, details: Type.Decimal): Json = Json.fromBigDecimal(BigDecimal(unscaled, details.scale)) - override def timestampValue(v: Instant): Json = Json.fromString(v.toString) - override def dateValue(v: LocalDate): Json = Json.fromString(v.toString) - override def arrayValue(vs: List[Json]): Json = Json.fromValues(vs) - override def structValue(vs: List[Caster.NamedValue[Json]]): Json = - Json.fromJsonObject(JsonObject.fromIterable(vs.map(nv => nv.name -> nv.value))) + override def timestampValue(v: Instant): Json = Json.fromString(v.toString) + override def dateValue(v: LocalDate): Json = Json.fromString(v.toString) + override def arrayValue(vs: Vector[Json]): Json = Json.fromValues(vs) + override def structValue(vs: NonEmptyVector[Caster.NamedValue[Json]]): Json = + Json.fromJsonObject(JsonObject.fromIterable(vs.toVector.map(nv => nv.name -> nv.value))) } diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformStructuredSpec.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformStructuredSpec.scala index 1571f26..78b04ba 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformStructuredSpec.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformStructuredSpec.scala @@ -16,6 +16,7 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type} import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent} import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Processor => BadRowProcessor} import com.snowplowanalytics.snowplow.loaders.transform.NonAtomicFields.{ColumnFailure, Result} +import cats.data.NonEmptyVector import io.circe._ import io.circe.literal._ import org.specs2.Specification @@ -28,12 +29,12 @@ import scala.collection.immutable.SortedMap class TransformStructuredSpec extends Specification { private val simpleOneFieldSchema = - List( + NonEmptyVector.of( Field("my_string", Type.String, Required) ) private val schemaWithAllPossibleTypes = - List( + NonEmptyVector.of( Field("my_string", Type.String, Required), Field("my_int", Type.Integer, Required), Field("my_long", Type.Long, Required), @@ -43,7 +44,7 @@ class TransformStructuredSpec extends Specification { Field("my_date", Type.Date, Required), Field("my_timestamp", Type.Timestamp, Required), Field("my_array", Type.Array(Type.Integer, Required), Required), - Field("my_object", Type.Struct(List(Field("abc", Type.String, Required))), Required), + Field("my_object", Type.Struct(NonEmptyVector.of(Field("abc", Type.String, Required))), Required), Field("my_null", Type.String, Nullable) ) @@ -86,8 +87,10 @@ class TransformStructuredSpec extends Specification { Failures: Atomic currency field cannot be cast to a decimal due to rounding $atomicTooManyDecimalPoints Atomic currency field cannot be cast to a decimal due to high precision $atomicHighPrecision - Missing value for unstruct (null passed in required field) $unstructMissingValue - Missing value for context (null passed in required field) $contextMissingValue + Missing value for unstruct (missing required field) $unstructMissingValue + Missing value for unstruct (null passed in required field) $unstructNullValue + Missing value for context (missing required field) $contextMissingValue + Missing value for context (null passed in required field) $contextNullValue Cast error for unstruct (integer passed in string field) $unstructWrongType Cast error for context (integer passed in string field) $contextWrongType Iglu error in batch info becomes iglu transformation error $igluErrorInBatchInfo @@ -421,6 +424,23 @@ class TransformStructuredSpec extends Specification { } def unstructMissingValue = { + val inputEvent = + createEvent(unstruct = Some(sdj(data = json"""{}""", key = "iglu:com.example/mySchema/jsonschema/1-0-0"))) + val batchInfo = Result( + fields = Vector(mySchemaUnstruct(model = 1, subVersions = Set((0, 0)))), + igluFailures = List.empty + ) + + val expectedError = FailureDetails.LoaderIgluError.WrongType( + SchemaKey("com.example", "mySchema", "jsonschema", Full(1, 0, 0)), + value = json"""null""", + expected = "String" + ) + + assertLoaderError(inputEvent, batchInfo, List(expectedError)) + } + + def unstructNullValue = { val inputEvent = createEvent(unstruct = Some(sdj(data = json"""{ "my_string": null}""", key = "iglu:com.example/mySchema/jsonschema/1-0-0"))) val batchInfo = Result( @@ -455,6 +475,23 @@ class TransformStructuredSpec extends Specification { } def contextMissingValue = { + val inputEvent = + createEvent(contexts = List(sdj(data = json"""{}""", key = "iglu:com.example/mySchema/jsonschema/1-0-0"))) + val batchInfo = Result( + fields = Vector(mySchemaContexts(model = 1, subVersions = Set((0, 0)))), + igluFailures = List.empty + ) + + val expectedError = FailureDetails.LoaderIgluError.WrongType( + SchemaKey("com.example", "mySchema", "jsonschema", Full(1, 0, 0)), + value = json"""null""", + expected = "String" + ) + + assertLoaderError(inputEvent, batchInfo, List(expectedError)) + } + + def contextNullValue = { val inputEvent = createEvent(contexts = List(sdj(data = json"""{ "my_string": null}""", key = "iglu:com.example/mySchema/jsonschema/1-0-0"))) val batchInfo = Result( @@ -558,7 +595,7 @@ class TransformStructuredSpec extends Specification { private def mySchemaUnstruct( model: Int, subVersions: Set[SchemaSubVersion], - ddl: List[Field] = simpleOneFieldSchema + ddl: NonEmptyVector[Field] = simpleOneFieldSchema ) = TypedTabledEntity( tabledEntity = TabledEntity(TabledEntity.UnstructEvent, "com.example", "mySchema", model), mergedField = Field(s"unstruct_event_com_example_my_schema_$model", Type.Struct(ddl), Nullable, Set.empty), @@ -569,9 +606,9 @@ class TransformStructuredSpec extends Specification { private def mySchemaContexts( model: Int, subVersions: Set[SchemaSubVersion], - ddl: List[Field] = simpleOneFieldSchema + ddl: NonEmptyVector[Field] = simpleOneFieldSchema ): TypedTabledEntity = { - val withSchemaVersion = Field("_schema_version", Type.String, Required) :: ddl + val withSchemaVersion = Field("_schema_version", Type.String, Required) +: ddl TypedTabledEntity( tabledEntity = TabledEntity(TabledEntity.Context, "com.example", "mySchema", model), mergedField = diff --git a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformUnstructuredSpec.scala b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformUnstructuredSpec.scala index 410cb93..010698f 100644 --- a/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformUnstructuredSpec.scala +++ b/modules/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/transform/TransformUnstructuredSpec.scala @@ -8,6 +8,7 @@ package com.snowplowanalytics.snowplow.loaders.transform import org.specs2.Specification +import cats.data.NonEmptyVector import io.circe._ import io.circe.literal._ import io.circe.syntax._ @@ -283,18 +284,18 @@ object TransformUnstructuredSpec { val testSchemaKey801 = SchemaKey.fromUri("iglu:com.example/mySchema/jsonschema/8-0-1").toOption.get val typeCaster = new Caster[String] { - override def nullValue: String = "null" - override def jsonValue(v: Json): String = "json" - override def stringValue(v: String): String = "string" - override def booleanValue(v: Boolean): String = "boolean" - override def intValue(v: Int): String = "int" - override def longValue(v: Long): String = "long" - override def doubleValue(v: Double): String = "double" - override def decimalValue(unscaled: BigInt, details: Type.Decimal): String = "double" - override def timestampValue(v: Instant): String = "timestamp" - override def dateValue(v: LocalDate): String = "date" - override def arrayValue(vs: List[String]): String = "array" - override def structValue(vs: List[Caster.NamedValue[String]]): String = "struct" + override def nullValue: String = "null" + override def jsonValue(v: Json): String = "json" + override def stringValue(v: String): String = "string" + override def booleanValue(v: Boolean): String = "boolean" + override def intValue(v: Int): String = "int" + override def longValue(v: Long): String = "long" + override def doubleValue(v: Double): String = "double" + override def decimalValue(unscaled: BigInt, details: Type.Decimal): String = "double" + override def timestampValue(v: Instant): String = "timestamp" + override def dateValue(v: LocalDate): String = "date" + override def arrayValue(vs: Vector[String]): String = "array" + override def structValue(vs: NonEmptyVector[Caster.NamedValue[String]]): String = "struct" } val typeCirceFolder = new Json.Folder[String] { diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 18724b5..ddaa94c 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -28,6 +28,9 @@ import com.typesafe.sbt.site.SiteScaladocPlugin.autoImport._ import com.github.sbt.git.SbtGit.git import com.github.sbt.sbtghpages.GhpagesPlugin.autoImport.ghpagesNoJekyll +// Iglu plugin +import com.snowplowanalytics.snowplow.sbt.IgluSchemaPlugin.autoImport._ + object BuildSettings { lazy val scala212 = "2.12.18" @@ -107,4 +110,12 @@ object BuildSettings { git.remoteRepo := "git@github.com:snowplow-incubator/common-streams.git", ghpagesNoJekyll := true ) + + val igluTestSettings = Seq( + Test / igluUris := Seq( + // Iglu Central schemas used in tests will get pre-fetched by sbt + "iglu:com.snowplowanalytics.iglu/anything-a/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.snowplow.media/ad_break_end_event/jsonschema/1-0-0" + ) + ) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4ce940a..93a458f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,7 +38,7 @@ object Dependencies { val azureSdk = "1.11.4" // Snowplow - val schemaDdl = "0.22.2" + val schemaDdl = "0.23.0" val badrows = "2.3.0" val igluClient = "3.1.0" val tracker = "2.0.0"