Skip to content

Commit

Permalink
Feature flag to support the legacy column style -- bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 16, 2024
1 parent 819f267 commit 2f3701b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ object LegacyColumns {
case TabledEntity.Context => LegacyMode.Repeated
}
val columnName = legacyColumnName(entityType, schemaKey)
val legacyField = LegacyField.build(columnName, schema, false).setMode(mode).normalized
val v2Field = legacyFieldToV2Field(legacyField).copy(accessors = Set.empty)
val legacyField = LegacyField.build(columnName, schema, false).setMode(mode)
val v2Field = V2Field.normalize(legacyFieldToV2Field(legacyField)).copy(accessors = Set.empty)
FieldForEntity(v2Field, schemaKey, entityType)
}
.leftMap(ColumnFailure(schemaKey, entityType, _))
Expand Down Expand Up @@ -160,6 +160,24 @@ object LegacyColumns {
BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event))
}.toEither

/**
* Replaces any Json type with a String type
*
* This must be run after transformation, but before handling schema evolution. It is needed
* because the legacy loader used String columns where the v2 loader would create Json columns
*/
def dropJsonTypes(v2Field: V2Field): V2Field = {
def dropFromType(t: V2Type): V2Type = t match {
case V2Type.Json => V2Type.String
case V2Type.Array(element, nullability) => V2Type.Array(dropFromType(element), nullability)
case V2Type.Struct(fields) => V2Type.Struct(fields.map(dropJsonTypes))
case other => other
}

v2Field.copy(fieldType = dropFromType(v2Field.fieldType))

}

/**
* Convert from the legacy Field of the old v1 loader into the new style Field.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Processing {
(moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields)
fields = v2NonAtomicFields.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
} ++ legacyFields.fields.map(_.field)
} ++ legacyFields.fields.map(f => LegacyColumns.dropJsonTypes(f.field))
} yield BatchAfterTransform(
rows,
fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
*/
package com.snowplowanalytics.snowplow.bigquery.processing

import cats.Applicative
import cats.implicits._
import cats.effect.{Async, Poll, Resource, Sync}
import com.google.api.gax.batching.FlowControlSettings
Expand Down Expand Up @@ -166,25 +165,20 @@ object Writer {
def write(rows: List[Map[String, AnyRef]]): F[WriteResult] =
Sync[F]
.delay(writer.append(new JSONArray(rows.map(_.asJava).asJava)))
.attemptNarrow[BQExceptions.AppendSerializationError]
.flatMap[WriteResult] {
case Right(fut) =>
FutureInterop
.fromFuture(fut)
.as[WriteResult](WriteResult.Success)
.recover {
case e: BQExceptions.SchemaMismatchedException =>
WriteResult.ServerSideSchemaMismatch(e)
case e: BQExceptions.StreamWriterClosedException =>
WriteResult.WriterWasClosedByEarlierError(e)
}
case Left(appendSerializationError) =>
Applicative[F].pure {
WriteResult.SerializationFailures {
appendSerializationError.getRowIndexToErrorMessage.asScala.map { case (i, cause) =>
i.toInt -> cause
}.toMap
}
.flatMap(FutureInterop.fromFuture(_))
.as[WriteResult](WriteResult.Success)
.recover {
// Some of these errors can happen either client side during `writer.append`
// ...or server-side during `FutureInterop.fromFuture(_)`
case e: BQExceptions.SchemaMismatchedException =>
WriteResult.ServerSideSchemaMismatch(e)
case e: BQExceptions.StreamWriterClosedException =>
WriteResult.WriterWasClosedByEarlierError(e)
case e: BQExceptions.AppendSerializationError =>
WriteResult.SerializationFailures {
e.getRowIndexToErrorMessage.asScala.map { case (i, cause) =>
i.toInt -> cause
}.toMap
}
}

Expand Down

0 comments on commit 2f3701b

Please sign in to comment.