From 576e0cf5286dfa9daeb225965c9dc66b26cea38e Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Fri, 12 Jul 2024 13:59:29 +0100 Subject: [PATCH] legacy columns - simplify BatchAfterTransform --- .../processing/Processing.scala | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index b6031af0..8646f2f0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -31,14 +31,7 @@ import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProce import com.snowplowanalytics.snowplow.sinks.ListOfList import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.runtime.processing.BatchUp -import com.snowplowanalytics.snowplow.loaders.transform.{ - BadRowsSerializer, - NonAtomicFields, - SchemaSubVersion, - TabledEntity, - Transform, - TypedTabledEntity -} +import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, NonAtomicFields, SchemaSubVersion, TabledEntity, Transform} import com.snowplowanalytics.snowplow.bigquery.{AppHealth, Environment, Metrics} object Processing { @@ -80,11 +73,7 @@ object Processing { * once they have either failed or got inserted. Implemented as a Vector because we need to do * lookup by index. * @param v2Entities - * The typed Fields for self-describing entities in this batch. Only schemas for which we use - * the v2 column style - * @param legacyEntities - * More typed Fields for self-describing entities in this batch. Only schemas for which we use - * the legacy v1 column style + * The typed Fields for self-describing entities in this batch. * @param origBatchSize * The count of events in the original batch. Includes all good and bad events. * @param origBatchBytes @@ -96,8 +85,7 @@ object Processing { */ private case class BatchAfterTransform( toBeInserted: List[EventWithTransform], - v2Entities: Vector[TypedTabledEntity], - legacyEntities: Vector[Field], + entities: Vector[Field], origBatchBytes: Long, origBatchCount: Long, badAccumulated: ListOfList[BadRow], @@ -163,10 +151,12 @@ object Processing { v2NonAtomicFields <- NonAtomicFields.resolveTypes[F](env.resolver, entities, env.schemasToSkip ::: env.legacyColumns) legacyFields <- LegacyColumns.resolveTypes[F](env.resolver, entities, env.legacyColumns) (moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields) + fields = v2NonAtomicFields.fields.flatMap { tte => + tte.mergedField :: tte.recoveries.map(_._2) + } ++ legacyFields.fields.map(_.field) } yield BatchAfterTransform( rows, - v2NonAtomicFields.fields, - legacyFields.fields.map(_.field), + fields, countBytes, events.size + parseFailures.size, parseFailures.prepend(moreBad), @@ -318,10 +308,7 @@ object Processing { env.writer.opened .use(_.descriptor) .flatMap { descriptor => - val fields = batch.v2Entities.flatMap { tte => - tte.mergedField :: tte.recoveries.map(_._2) - } ++ batch.legacyEntities - val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields) + val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, batch.entities) if (fieldsToAdd.nonEmpty) { env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist => openWriterUntilFieldsExist(env, fieldsToExist)