diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala index 183ac088..77fcb661 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala @@ -125,8 +125,8 @@ object LegacyColumns { case TabledEntity.Context => LegacyMode.Repeated } val columnName = legacyColumnName(entityType, schemaKey) - val legacyField = LegacyField.build(columnName, schema, false).setMode(mode).normalized - val v2Field = legacyFieldToV2Field(legacyField).copy(accessors = Set.empty) + val legacyField = LegacyField.build(columnName, schema, false).setMode(mode) + val v2Field = V2Field.normalize(legacyFieldToV2Field(legacyField)) FieldForEntity(v2Field, schemaKey, entityType) } .leftMap(ColumnFailure(schemaKey, entityType, _)) @@ -160,6 +160,24 @@ object LegacyColumns { BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event)) }.toEither + /** + * Replaces any Json type with a String type + * + * This must be run after transformation, but before handling schema evolution. It is needed + * because the legacy loader used String columns where the v2 loader would create Json columns + */ + def dropJsonTypes(v2Field: V2Field): V2Field = { + def dropFromType(t: V2Type): V2Type = t match { + case V2Type.Json => V2Type.String + case V2Type.Array(element, nullability) => V2Type.Array(dropFromType(element), nullability) + case V2Type.Struct(fields) => V2Type.Struct(fields.map(dropJsonTypes)) + case other => other + } + + v2Field.copy(fieldType = dropFromType(v2Field.fieldType)) + + } + /** * Convert from the legacy Field of the old v1 loader into the new style Field. * diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index 8646f2f0..2182d19c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -153,7 +153,7 @@ object Processing { (moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields) fields = v2NonAtomicFields.fields.flatMap { tte => tte.mergedField :: tte.recoveries.map(_._2) - } ++ legacyFields.fields.map(_.field) + } ++ legacyFields.fields.map(f => LegacyColumns.dropJsonTypes(f.field)) } yield BatchAfterTransform( rows, fields, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala index f4720788..c9d14282 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala @@ -8,7 +8,6 @@ */ package com.snowplowanalytics.snowplow.bigquery.processing -import cats.Applicative import cats.implicits._ import cats.effect.{Async, Poll, Resource, Sync} import com.google.api.gax.batching.FlowControlSettings @@ -166,25 +165,20 @@ object Writer { def write(rows: List[Map[String, AnyRef]]): F[WriteResult] = Sync[F] .delay(writer.append(new JSONArray(rows.map(_.asJava).asJava))) - .attemptNarrow[BQExceptions.AppendSerializationError] - .flatMap[WriteResult] { - case Right(fut) => - FutureInterop - .fromFuture(fut) - .as[WriteResult](WriteResult.Success) - .recover { - case e: BQExceptions.SchemaMismatchedException => - WriteResult.ServerSideSchemaMismatch(e) - case e: BQExceptions.StreamWriterClosedException => - WriteResult.WriterWasClosedByEarlierError(e) - } - case Left(appendSerializationError) => - Applicative[F].pure { - WriteResult.SerializationFailures { - appendSerializationError.getRowIndexToErrorMessage.asScala.map { case (i, cause) => - i.toInt -> cause - }.toMap - } + .flatMap(FutureInterop.fromFuture(_)) + .as[WriteResult](WriteResult.Success) + .recover { + // Some of these errors can happen either client side during `writer.append` + // ...or server-side during `FutureInterop.fromFuture(_)` + case e: BQExceptions.SchemaMismatchedException => + WriteResult.ServerSideSchemaMismatch(e) + case e: BQExceptions.StreamWriterClosedException => + WriteResult.WriterWasClosedByEarlierError(e) + case e: BQExceptions.AppendSerializationError => + WriteResult.SerializationFailures { + e.getRowIndexToErrorMessage.asScala.map { case (i, cause) => + i.toInt -> cause + }.toMap } } diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-0 index dffe9d5e..42380727 100644 --- a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-0 +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-0 @@ -8,7 +8,8 @@ }, "type": "object", "properties": { - "col_a": {"type": "string"} + "col_snake": {"type": "string"}, + "colCamel": {"type": "string"} }, - "required": ["col_a"] + "required": ["col_snake", "colCamel"] } diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-1 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-1 index dd26ec68..c800a170 100644 --- a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-1 +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-0-1 @@ -8,8 +8,9 @@ }, "type": "object", "properties": { - "col_a": {"type": "string"}, - "col_b": {"type": "string"} + "col_snake": {"type": "string"}, + "colCamel": {"type": "string"}, + "col_extra": {"type": "string"} }, - "required": ["col_a", "col_b"] + "required": ["col_snake", "colCamel", "col_extra"] } diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-1-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-1-0 index 2f381d09..ba4db51c 100644 --- a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-1-0 +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/myschema/jsonschema/7-1-0 @@ -8,8 +8,9 @@ }, "type": "object", "properties": { - "col_a": {"type": "string"}, - "col_c": {"type": "integer"} + "col_snake": {"type": "string"}, + "colCamel": {"type": "string"}, + "col_other": {"type": "integer"} }, - "required": ["col_a"] + "required": ["col_snake", "colCamel"] } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumnsResolveSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumnsResolveSpec.scala index ba4c33e0..689a1c7d 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumnsResolveSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumnsResolveSpec.scala @@ -58,11 +58,12 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")) ) ) - val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable, Set.empty) + val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -92,11 +93,12 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected100 = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")) ) ) - val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable, Set.empty) + val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -108,12 +110,13 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected110 = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required), - Field("col_c", Type.Long, Nullable) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")), + Field("col_other", Type.Long, Nullable, Set("col_other")) ) ) - val expectedField = Field("unstruct_event_myvendor_myschema_7_1_0", expectedStruct, Nullable, Set.empty) + val expectedField = Field("unstruct_event_myvendor_myschema_7_1_0", expectedStruct, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -145,7 +148,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected = { val expectedType = Type.Json val expectedField = - Field("unstruct_event_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedType, Nullable, Set.empty) + Field("unstruct_event_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedType, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -175,7 +178,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected = { val expectedType = Type.Json - val expectedField = Field("unstruct_event_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedType, Nullable, Set.empty) + val expectedField = Field("unstruct_event_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedType, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -223,13 +226,14 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")) ) ) val expectedArray = Type.Array(expectedStruct, Nullable) - val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable, Set.empty) + val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -259,13 +263,14 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected100 = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")) ) ) val expectedArray = Type.Array(expectedStruct, Nullable) - val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable, Set.empty) + val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -277,14 +282,15 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected110 = { val expectedStruct = Type.Struct( NonEmptyVector.of( - Field("col_a", Type.Json, Required), - Field("col_c", Type.Long, Nullable) + Field("col_camel", Type.Json, Required, Set("colCamel")), + Field("col_snake", Type.Json, Required, Set("col_snake")), + Field("col_other", Type.Long, Nullable, Set("col_other")) ) ) val expectedArray = Type.Array(expectedStruct, Nullable) - val expectedField = Field("contexts_myvendor_myschema_7_1_0", expectedArray, Nullable, Set.empty) + val expectedField = Field("contexts_myvendor_myschema_7_1_0", expectedArray, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -316,7 +322,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expected = { val expectedArray = Type.Array(Type.Json, Nullable) val expectedField = - Field("contexts_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedArray, Nullable, Set.empty) + Field("contexts_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedArray, Nullable) LegacyColumns.FieldForEntity( expectedField, @@ -350,7 +356,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect { val expectedArray = Type.Array(expectedType, Nullable) - val expectedField = Field("contexts_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedArray, Nullable, Set.empty) + val expectedField = Field("contexts_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedArray, Nullable) LegacyColumns.FieldForEntity( expectedField,