diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala index 183ac088..06d85d60 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/LegacyColumns.scala @@ -125,8 +125,8 @@ object LegacyColumns { case TabledEntity.Context => LegacyMode.Repeated } val columnName = legacyColumnName(entityType, schemaKey) - val legacyField = LegacyField.build(columnName, schema, false).setMode(mode).normalized - val v2Field = legacyFieldToV2Field(legacyField).copy(accessors = Set.empty) + val legacyField = LegacyField.build(columnName, schema, false).setMode(mode) + val v2Field = V2Field.normalize(legacyFieldToV2Field(legacyField)).copy(accessors = Set.empty) FieldForEntity(v2Field, schemaKey, entityType) } .leftMap(ColumnFailure(schemaKey, entityType, _)) @@ -160,6 +160,24 @@ object LegacyColumns { BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event)) }.toEither + /** + * Replaces any Json type with a String type + * + * This must be run after transformation, but before handling schema evolution. It is needed + * because the legacy loader used String columns where the v2 loader would create Json columns + */ + def dropJsonTypes(v2Field: V2Field): V2Field = { + def dropFromType(t: V2Type): V2Type = t match { + case V2Type.Json => V2Type.String + case V2Type.Array(element, nullability) => V2Type.Array(dropFromType(element), nullability) + case V2Type.Struct(fields) => V2Type.Struct(fields.map(dropJsonTypes)) + case other => other + } + + v2Field.copy(fieldType = dropFromType(v2Field.fieldType)) + + } + /** * Convert from the legacy Field of the old v1 loader into the new style Field. * diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index 8646f2f0..2182d19c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -153,7 +153,7 @@ object Processing { (moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields) fields = v2NonAtomicFields.fields.flatMap { tte => tte.mergedField :: tte.recoveries.map(_._2) - } ++ legacyFields.fields.map(_.field) + } ++ legacyFields.fields.map(f => LegacyColumns.dropJsonTypes(f.field)) } yield BatchAfterTransform( rows, fields, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala index f4720788..c9d14282 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Writer.scala @@ -8,7 +8,6 @@ */ package com.snowplowanalytics.snowplow.bigquery.processing -import cats.Applicative import cats.implicits._ import cats.effect.{Async, Poll, Resource, Sync} import com.google.api.gax.batching.FlowControlSettings @@ -166,25 +165,20 @@ object Writer { def write(rows: List[Map[String, AnyRef]]): F[WriteResult] = Sync[F] .delay(writer.append(new JSONArray(rows.map(_.asJava).asJava))) - .attemptNarrow[BQExceptions.AppendSerializationError] - .flatMap[WriteResult] { - case Right(fut) => - FutureInterop - .fromFuture(fut) - .as[WriteResult](WriteResult.Success) - .recover { - case e: BQExceptions.SchemaMismatchedException => - WriteResult.ServerSideSchemaMismatch(e) - case e: BQExceptions.StreamWriterClosedException => - WriteResult.WriterWasClosedByEarlierError(e) - } - case Left(appendSerializationError) => - Applicative[F].pure { - WriteResult.SerializationFailures { - appendSerializationError.getRowIndexToErrorMessage.asScala.map { case (i, cause) => - i.toInt -> cause - }.toMap - } + .flatMap(FutureInterop.fromFuture(_)) + .as[WriteResult](WriteResult.Success) + .recover { + // Some of these errors can happen either client side during `writer.append` + // ...or server-side during `FutureInterop.fromFuture(_)` + case e: BQExceptions.SchemaMismatchedException => + WriteResult.ServerSideSchemaMismatch(e) + case e: BQExceptions.StreamWriterClosedException => + WriteResult.WriterWasClosedByEarlierError(e) + case e: BQExceptions.AppendSerializationError => + WriteResult.SerializationFailures { + e.getRowIndexToErrorMessage.asScala.map { case (i, cause) => + i.toInt -> cause + }.toMap } }