Skip to content

Commit

Permalink
legacy columns - simplify BatchAfterTransform
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 12, 2024
1 parent 555fa8e commit 576e0cf
Showing 1 changed file with 8 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -31,14 +31,7 @@ import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProce
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.loaders.transform.{
BadRowsSerializer,
NonAtomicFields,
SchemaSubVersion,
TabledEntity,
Transform,
TypedTabledEntity
}
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, NonAtomicFields, SchemaSubVersion, TabledEntity, Transform}
import com.snowplowanalytics.snowplow.bigquery.{AppHealth, Environment, Metrics}

object Processing {
@@ -80,11 +73,7 @@ object Processing {
* once they have either failed or got inserted. Implemented as a Vector because we need to do
* lookup by index.
* @param v2Entities
* The typed Fields for self-describing entities in this batch. Only schemas for which we use
* the v2 column style
* @param legacyEntities
* More typed Fields for self-describing entities in this batch. Only schemas for which we use
* the legacy v1 column style
* The typed Fields for self-describing entities in this batch.
* @param origBatchSize
* The count of events in the original batch. Includes all good and bad events.
* @param origBatchBytes
@@ -96,8 +85,7 @@ object Processing {
*/
private case class BatchAfterTransform(
toBeInserted: List[EventWithTransform],
v2Entities: Vector[TypedTabledEntity],
legacyEntities: Vector[Field],
entities: Vector[Field],
origBatchBytes: Long,
origBatchCount: Long,
badAccumulated: ListOfList[BadRow],
@@ -163,10 +151,12 @@ object Processing {
v2NonAtomicFields <- NonAtomicFields.resolveTypes[F](env.resolver, entities, env.schemasToSkip ::: env.legacyColumns)
legacyFields <- LegacyColumns.resolveTypes[F](env.resolver, entities, env.legacyColumns)
(moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields)
fields = v2NonAtomicFields.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
} ++ legacyFields.fields.map(_.field)
} yield BatchAfterTransform(
rows,
v2NonAtomicFields.fields,
legacyFields.fields.map(_.field),
fields,
countBytes,
events.size + parseFailures.size,
parseFailures.prepend(moreBad),
@@ -318,10 +308,7 @@ object Processing {
env.writer.opened
.use(_.descriptor)
.flatMap { descriptor =>
val fields = batch.v2Entities.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
} ++ batch.legacyEntities
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields)
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, batch.entities)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector).flatMap { fieldsToExist =>
openWriterUntilFieldsExist(env, fieldsToExist)

0 comments on commit 576e0cf

Please sign in to comment.