Skip to content

Commit

Permalink
Feature flag to support the legacy column style -- bug fixes (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored Jul 17, 2024
1 parent 819f267 commit 3ba7f06
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ object LegacyColumns {
case TabledEntity.Context => LegacyMode.Repeated
}
val columnName = legacyColumnName(entityType, schemaKey)
val legacyField = LegacyField.build(columnName, schema, false).setMode(mode).normalized
val v2Field = legacyFieldToV2Field(legacyField).copy(accessors = Set.empty)
val legacyField = LegacyField.build(columnName, schema, false).setMode(mode)
val v2Field = V2Field.normalize(legacyFieldToV2Field(legacyField))
FieldForEntity(v2Field, schemaKey, entityType)
}
.leftMap(ColumnFailure(schemaKey, entityType, _))
Expand Down Expand Up @@ -160,6 +160,24 @@ object LegacyColumns {
BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event))
}.toEither

/**
* Replaces any Json type with a String type
*
* This must be run after transformation, but before handling schema evolution. It is needed
* because the legacy loader used String columns where the v2 loader would create Json columns
*/
def dropJsonTypes(v2Field: V2Field): V2Field = {
def dropFromType(t: V2Type): V2Type = t match {
case V2Type.Json => V2Type.String
case V2Type.Array(element, nullability) => V2Type.Array(dropFromType(element), nullability)
case V2Type.Struct(fields) => V2Type.Struct(fields.map(dropJsonTypes))
case other => other
}

v2Field.copy(fieldType = dropFromType(v2Field.fieldType))

}

/**
* Convert from the legacy Field of the old v1 loader into the new style Field.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Processing {
(moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields)
fields = v2NonAtomicFields.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
} ++ legacyFields.fields.map(_.field)
} ++ legacyFields.fields.map(f => LegacyColumns.dropJsonTypes(f.field))
} yield BatchAfterTransform(
rows,
fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.Applicative
import cats.implicits._
import cats.effect.{Async, Poll, Resource, Sync}
import com.google.api.gax.batching.FlowControlSettings
Expand Down Expand Up @@ -166,25 +165,20 @@ object Writer {
def write(rows: List[Map[String, AnyRef]]): F[WriteResult] =
Sync[F]
.delay(writer.append(new JSONArray(rows.map(_.asJava).asJava)))
.attemptNarrow[BQExceptions.AppendSerializationError]
.flatMap[WriteResult] {
case Right(fut) =>
FutureInterop
.fromFuture(fut)
.as[WriteResult](WriteResult.Success)
.recover {
case e: BQExceptions.SchemaMismatchedException =>
WriteResult.ServerSideSchemaMismatch(e)
case e: BQExceptions.StreamWriterClosedException =>
WriteResult.WriterWasClosedByEarlierError(e)
}
case Left(appendSerializationError) =>
Applicative[F].pure {
WriteResult.SerializationFailures {
appendSerializationError.getRowIndexToErrorMessage.asScala.map { case (i, cause) =>
i.toInt -> cause
}.toMap
}
.flatMap(FutureInterop.fromFuture(_))
.as[WriteResult](WriteResult.Success)
.recover {
// Some of these errors can happen either client side during `writer.append`
// ...or server-side during `FutureInterop.fromFuture(_)`
case e: BQExceptions.SchemaMismatchedException =>
WriteResult.ServerSideSchemaMismatch(e)
case e: BQExceptions.StreamWriterClosedException =>
WriteResult.WriterWasClosedByEarlierError(e)
case e: BQExceptions.AppendSerializationError =>
WriteResult.SerializationFailures {
e.getRowIndexToErrorMessage.asScala.map { case (i, cause) =>
i.toInt -> cause
}.toMap
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
},
"type": "object",
"properties": {
"col_a": {"type": "string"}
"col_snake": {"type": "string"},
"colCamel": {"type": "string"}
},
"required": ["col_a"]
"required": ["col_snake", "colCamel"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
},
"type": "object",
"properties": {
"col_a": {"type": "string"},
"col_b": {"type": "string"}
"col_snake": {"type": "string"},
"colCamel": {"type": "string"},
"col_extra": {"type": "string"}
},
"required": ["col_a", "col_b"]
"required": ["col_snake", "colCamel", "col_extra"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
},
"type": "object",
"properties": {
"col_a": {"type": "string"},
"col_c": {"type": "integer"}
"col_snake": {"type": "string"},
"colCamel": {"type": "string"},
"col_other": {"type": "integer"}
},
"required": ["col_a"]
"required": ["col_snake", "colCamel"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake"))
)
)

val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable, Set.empty)
val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -92,11 +93,12 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected100 = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake"))
)
)

val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable, Set.empty)
val expectedField = Field("unstruct_event_myvendor_myschema_7_0_0", expectedStruct, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand All @@ -108,12 +110,13 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected110 = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required),
Field("col_c", Type.Long, Nullable)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake")),
Field("col_other", Type.Long, Nullable, Set("col_other"))
)
)

val expectedField = Field("unstruct_event_myvendor_myschema_7_1_0", expectedStruct, Nullable, Set.empty)
val expectedField = Field("unstruct_event_myvendor_myschema_7_1_0", expectedStruct, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -145,7 +148,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected = {
val expectedType = Type.Json
val expectedField =
Field("unstruct_event_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedType, Nullable, Set.empty)
Field("unstruct_event_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedType, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -175,7 +178,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {

val expected = {
val expectedType = Type.Json
val expectedField = Field("unstruct_event_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedType, Nullable, Set.empty)
val expectedField = Field("unstruct_event_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedType, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -223,13 +226,14 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake"))
)
)

val expectedArray = Type.Array(expectedStruct, Nullable)

val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable, Set.empty)
val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -259,13 +263,14 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected100 = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake"))
)
)

val expectedArray = Type.Array(expectedStruct, Nullable)

val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable, Set.empty)
val expectedField = Field("contexts_myvendor_myschema_7_0_0", expectedArray, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand All @@ -277,14 +282,15 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected110 = {
val expectedStruct = Type.Struct(
NonEmptyVector.of(
Field("col_a", Type.Json, Required),
Field("col_c", Type.Long, Nullable)
Field("col_camel", Type.Json, Required, Set("colCamel")),
Field("col_snake", Type.Json, Required, Set("col_snake")),
Field("col_other", Type.Long, Nullable, Set("col_other"))
)
)

val expectedArray = Type.Array(expectedStruct, Nullable)

val expectedField = Field("contexts_myvendor_myschema_7_1_0", expectedArray, Nullable, Set.empty)
val expectedField = Field("contexts_myvendor_myschema_7_1_0", expectedArray, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -316,7 +322,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {
val expected = {
val expectedArray = Type.Array(Type.Json, Nullable)
val expectedField =
Field("contexts_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedArray, Nullable, Set.empty)
Field("contexts_com_snowplowanalytics_snowplow_media_ad_break_end_event_1_0_0", expectedArray, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down Expand Up @@ -350,7 +356,7 @@ class LegacyColumnsResolveSpec extends Specification with CatsEffect {

val expectedArray = Type.Array(expectedType, Nullable)

val expectedField = Field("contexts_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedArray, Nullable, Set.empty)
val expectedField = Field("contexts_com_snowplowanalytics_iglu_anything_a_1_0_0", expectedArray, Nullable)

LegacyColumns.FieldForEntity(
expectedField,
Expand Down

0 comments on commit 3ba7f06

Please sign in to comment.