From d4fe3d36b25a8c06e26fc02522070bfc247f3252 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 28 Mar 2024 22:35:51 +0000 Subject: [PATCH] Stay healthy if BigQuery table exceeds column limit --- config/config.azure.reference.hocon | 8 + config/config.kinesis.reference.hocon | 8 + config/config.pubsub.reference.hocon | 8 + .../core/src/main/resources/reference.conf | 3 + .../Alert.scala | 24 ++- .../Config.scala | 5 +- .../Environment.scala | 7 +- .../processing/BigQueryRetrying.scala | 4 +- .../processing/BigQuerySchemaUtils.scala | 6 +- .../processing/BigQueryUtils.scala | 21 +- .../processing/Processing.scala | 5 +- .../processing/TableManager.scala | 142 +++++++----- .../MockEnvironment.scala | 16 +- .../processing/TableManagerSpec.scala | 204 ++++++++++++++++++ .../processing/WriterProviderSpec.scala | 3 +- .../snowplow/bigquery/KafkaConfigSpec.scala | 6 +- .../snowplow/bigquery/KinesisConfigSpec.scala | 6 +- .../snowplow/bigquery/PubsubConfigSpec.scala | 6 +- 18 files changed, 393 insertions(+), 89 deletions(-) create mode 100644 modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index a0800afe..526e9cda 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -86,6 +86,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 1e9b890a..c47abcf9 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -108,6 +108,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 1bd57717..993dbcda 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -88,6 +88,14 @@ "alterTableWait": { "delay": "1 second" } + + # -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table. + # -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run. + # -- Some events will inevitably go to the failed events output topic until new columns have been added. + # -- This param configures how often the loader will retry to alter the table after an earlier failure. + "tooManyColumns": { + "delay": "300 seconds" + } } # -- Schemas that won't be loaded to BigQuery. Optional, default value [] diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 26bba26b..3f1e1475 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -35,6 +35,9 @@ "alterTableWait": { "delay": "1 second" } + "tooManyColumns": { + "delay": "300 seconds" + } } "skipSchemas": [] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala index e36e3f4c..f8aaf034 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Alert.scala @@ -52,16 +52,26 @@ object Alert { } private implicit def throwableShow: Show[Throwable] = { - def go(acc: List[String], next: Throwable): String = { - val nextMessage = Option(next.getMessage) - val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } - Option(next.getCause) match { - case Some(cause) => go(msgs, cause) - case None => msgs.reverse.mkString(": ") + def accumulateMessages(t: Throwable): List[String] = { + val nextMessage = Option(t.getMessage) + Option(t.getCause) match { + case Some(cause) => nextMessage.toList ::: accumulateMessages(cause) + case None => nextMessage.toList } } - Show.show(go(Nil, _)) + Show.show { t => + removeDuplicateMessages(accumulateMessages(t)).mkString(": ") + } } + } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala index 738fb0f1..ebc75b53 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Config.scala @@ -86,11 +86,13 @@ object Config { case class SetupErrorRetries(delay: FiniteDuration) case class AlterTableWaitRetries(delay: FiniteDuration) case class TransientErrorRetries(delay: FiniteDuration, attempts: Int) + case class TooManyColumnsRetries(delay: FiniteDuration) case class Retries( setupErrors: SetupErrorRetries, transientErrors: TransientErrorRetries, - alterTableWait: AlterTableWaitRetries + alterTableWait: AlterTableWaitRetries, + tooManyColumns: TooManyColumnsRetries ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -119,6 +121,7 @@ object Config { implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries] implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries] implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries] + implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] // TODO add bigquery docs diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala index fae96c43..0135a31e 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/Environment.scala @@ -28,7 +28,7 @@ case class Environment[F[_]]( badSink: Sink[F], resolver: Resolver[F], httpClient: Client[F], - tableManager: TableManager[F], + tableManager: TableManager.WithHandledErrors[F], writer: Writer.Provider[F], metrics: Metrics[F], appHealth: AppHealth[F], @@ -62,7 +62,8 @@ object Environment { badSink <- toSink(config.main.output.bad.sink) metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics)) creds <- Resource.eval(BigQueryUtils.credentials(config.main.output.good)) - tableManager <- Resource.eval(TableManager.make(config.main.output.good, config.main.retries, creds, appHealth, monitoring)) + tableManager <- Resource.eval(TableManager.make(config.main.output.good, creds)) + tableManagerWrapped <- Resource.eval(TableManager.withHandledErrors(tableManager, config.main.retries, appHealth, monitoring)) writerBuilder <- Writer.builder(config.main.output.good, creds) writerProvider <- Writer.provider(writerBuilder, config.main.retries, appHealth, monitoring) } yield Environment( @@ -71,7 +72,7 @@ object Environment { badSink = badSink, resolver = resolver, httpClient = httpClient, - tableManager = tableManager, + tableManager = tableManagerWrapped, writer = writerProvider, metrics = metrics, appHealth = appHealth, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala index ff33a2b7..a3c2b7e7 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryRetrying.scala @@ -12,12 +12,14 @@ import cats.Applicative import cats.effect.Sync import cats.implicits._ import com.google.api.gax.rpc.PermissionDeniedException +import com.google.cloud.bigquery.BigQueryException import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import retry._ import retry.implicits.retrySyntaxError import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring} +import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax object BigQueryRetrying { @@ -54,7 +56,7 @@ object BigQueryRetrying { ) private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match { - case BigQueryUtils.BQExceptionWithLowerCaseReason("notfound" | "accessdenied") => + case bqe: BigQueryException if Set("notfound", "accessdenied").contains(bqe.lowerCaseReason) => true.pure[F] case _: PermissionDeniedException => true.pure[F] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala index 8b06148a..4d90d2e9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQuerySchemaUtils.scala @@ -16,8 +16,8 @@ import scala.jdk.CollectionConverters._ object BigQuerySchemaUtils { - def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Boolean = - ddlFields.exists { field => + def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] = + ddlFields.filter { field => Option(tableDescriptor.findFieldByName(field.name)) match { case Some(fieldDescriptor) => val nullableMismatch = fieldDescriptor.isRequired && field.nullability.nullable @@ -32,7 +32,7 @@ object BigQuerySchemaUtils { case Descriptors.FieldDescriptor.Type.MESSAGE => ddlField.fieldType match { case Type.Struct(nestedFields) => - alterTableRequired(tableField.getMessageType, nestedFields) + alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty case _ => false } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala index d37e7f0a..8bbe5897 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/BigQueryUtils.scala @@ -35,13 +35,18 @@ object BigQueryUtils { def streamIdOf(config: Config.BigQuery): String = tableIdOf(config).getIAMResourceName + "/streams/_default" - object BQExceptionWithLowerCaseReason { - def unapply(bqe: BigQueryException): Option[(String)] = - Option(bqe.getError()) match { - case Some(bqError) => - Some(bqError.getReason.toLowerCase) - case None => - None - } + implicit class BQExceptionSyntax(val bqe: BigQueryException) extends AnyVal { + def lowerCaseReason: String = + Option(bqe.getError()) + .flatMap(e => Option(e.getReason)) + .map(_.toLowerCase) + .getOrElse("") + + def lowerCaseMessage: String = + Option(bqe.getError()) + .flatMap(e => Option(e.getReason)) + .map(_.toLowerCase) + .getOrElse("") + } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala index f2269e88..f66da7e9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/Processing.scala @@ -297,8 +297,9 @@ object Processing { val fields = batch.entities.fields.flatMap { tte => tte.mergedField :: tte.recoveries.map(_._2) } - if (BigQuerySchemaUtils.alterTableRequired(descriptor, fields)) { - env.tableManager.addColumns(fields) *> env.writer.closed.use_ + val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields) + if (fieldsToAdd.nonEmpty) { + env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_ } else { Sync[F].unit } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala index cc6f65ce..7fdc68de 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManager.scala @@ -10,7 +10,8 @@ package com.snowplowanalytics.snowplow.bigquery.processing import cats.Show import cats.implicits._ -import cats.effect.{Async, Sync} +import cats.effect.implicits._ +import cats.effect.{Async, Ref, Sync} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import com.google.cloud.bigquery.{ @@ -25,16 +26,18 @@ import com.google.cloud.bigquery.{ TimePartitioning } import com.google.auth.Credentials +import com.google.cloud.bigquery.BigQueryException import com.snowplowanalytics.iglu.schemaddl.parquet.Field import com.snowplowanalytics.snowplow.loaders.transform.AtomicFields import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring} +import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax import scala.jdk.CollectionConverters._ trait TableManager[F[_]] { - def addColumns(columns: Vector[Field]): F[Unit] + def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult] def createTable: F[Unit] @@ -44,75 +47,114 @@ object TableManager { private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F] + sealed trait AddColumnsResult + object AddColumnsResult { + case object Success extends AddColumnsResult + case class TooManyColumnsInTable(t: Throwable) extends AddColumnsResult + } + + trait WithHandledErrors[F[_]] { + def addColumns(columns: Vector[Field]): F[Unit] + def createTable: F[Unit] + } + def make[F[_]: Async]( config: Config.BigQuery, - retries: Config.Retries, - credentials: Credentials, - appHealth: AppHealth[F], - monitoring: Monitoring[F] + credentials: Credentials ): F[TableManager[F]] = for { client <- Sync[F].delay(BigQueryOptions.newBuilder.setCredentials(credentials).build.getService) - } yield impl(config, retries, client, appHealth, monitoring) + } yield impl(config, client) - private def impl[F[_]: Async]( - config: Config.BigQuery, + def withHandledErrors[F[_]: Async]( + underlying: TableManager[F], retries: Config.Retries, - client: BigQuery, appHealth: AppHealth[F], monitoring: Monitoring[F] - ): TableManager[F] = new TableManager[F] { + ): F[WithHandledErrors[F]] = + for { + addingColumnsEnabled <- Ref[F].of[Boolean](true) + } yield new WithHandledErrors[F] { + def addColumns(columns: Vector[Field]): F[Unit] = + addingColumnsEnabled.get.flatMap { + case true => + BigQueryRetrying + .withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) { + Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *> + underlying.addColumns(columns) + } + .flatMap { + case AddColumnsResult.TooManyColumnsInTable(t) => + val enableAfterDelay = Async[F].sleep(retries.tooManyColumns.delay) *> addingColumnsEnabled.set(true) + for { + _ <- Logger[F].error(t)(s"Could not alter table schema because of too many columns") + _ <- monitoring.alert(Alert.FailedToAddColumns(columns.map(_.name), t)) + _ <- addingColumnsEnabled.set(false) + _ <- enableAfterDelay.start + } yield () + case _ => + Async[F].unit + } + case false => + Async[F].unit + } + + def createTable: F[Unit] = + BigQueryRetrying.withRetries(appHealth, retries, monitoring, Alert.FailedToCreateEventsTable(_)) { + underlying.createTable + } + } - def addColumns(columns: Vector[Field]): F[Unit] = - BigQueryRetrying.withRetries(appHealth, retries, monitoring, Alert.FailedToAddColumns(columns.map(_.name), _)) { - Logger[F].info(s"Altering table to add columns [${showColumns(columns)}]") *> - addColumnsImpl(config, client, columns) - } + private def impl[F[_]: Async]( + config: Config.BigQuery, + client: BigQuery + ): TableManager[F] = new TableManager[F] { - def createTable: F[Unit] = - BigQueryRetrying.withRetries(appHealth, retries, monitoring, Alert.FailedToCreateEventsTable(_)) { - val tableInfo = atomicTableInfo(config) - Logger[F].info(show"Creating table $tableInfo") *> - Sync[F] - .blocking(client.create(tableInfo)) - .void - .recoverWith { - case bqe @ BigQueryUtils.BQExceptionWithLowerCaseReason("duplicate") => - // Table already exists - Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}") - case BigQueryUtils.BQExceptionWithLowerCaseReason("accessdenied") => - Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.") - } - } + def addColumns(columns: Vector[Field]): F[TableManager.AddColumnsResult] = + for { + table <- Sync[F].blocking(client.getTable(config.dataset, config.table)) + schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema) + fields <- Sync[F].pure(schema.getFields) + fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns)) + schema <- Sync[F].pure(Schema.of(fields)) + table <- Sync[F].pure(setTableSchema(table, schema)) + result <- Sync[F] + .blocking(table.update()) + .as[AddColumnsResult](AddColumnsResult.Success) + .recover(handleTooManyColumns) + .onError(logOnRaceCondition) + } yield result + + def createTable: F[Unit] = { + val tableInfo = atomicTableInfo(config) + Logger[F].info(show"Creating table $tableInfo") *> + Sync[F] + .blocking(client.create(tableInfo)) + .void + .recoverWith { + case bqe: BigQueryException if bqe.lowerCaseReason === "duplicate" => + // Table already exists + Logger[F].info(s"Ignoring error when creating table: ${bqe.getMessage}") + case bqe: BigQueryException if bqe.lowerCaseReason === "accessdenied" => + Logger[F].info(s"Access denied when trying to create table. Will ignore error and assume table already exists.") + } + } } - private def addColumnsImpl[F[_]: Sync]( - config: Config.BigQuery, - client: BigQuery, - columns: Vector[Field] - ): F[Unit] = - for { - table <- Sync[F].blocking(client.getTable(config.dataset, config.table)) - schema <- Sync[F].pure(table.getDefinition[TableDefinition].getSchema) - fields <- Sync[F].pure(schema.getFields) - fields <- Sync[F].pure(BigQuerySchemaUtils.mergeInColumns(fields, columns)) - schema <- Sync[F].pure(Schema.of(fields)) - table <- Sync[F].pure(setTableSchema(table, schema)) - _ <- Sync[F] - .blocking(table.update()) - .void - .onError(logOnRaceCondition) - } yield () - private def setTableSchema(table: Table, schema: Schema): Table = table.toBuilder().setDefinition(StandardTableDefinition.of(schema)).build() private def logOnRaceCondition[F[_]: Sync]: PartialFunction[Throwable, F[Unit]] = { - case BigQueryUtils.BQExceptionWithLowerCaseReason("invalid") => + case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" => Logger[F].warn(s"Caught known exception which probably means another loader has already altered the table.") // Don't do anything else; the BigQueryRetrying will handle retries and logging the exception. } + private def handleTooManyColumns: PartialFunction[Throwable, AddColumnsResult] = { + case bqe: BigQueryException if bqe.lowerCaseReason === "invalid" && bqe.lowerCaseMessage.startsWith("too many columns") => + AddColumnsResult.TooManyColumnsInTable(bqe) + } + private def showColumns(columns: Vector[Field]): String = columns.map(_.name).mkString(", ") diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala index 48a5ccba..0a3b33aa 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/MockEnvironment.scala @@ -111,13 +111,14 @@ object MockEnvironment { def cloud = "OnPrem" } - private def testTableManager(state: Ref[IO, Vector[Action]]): TableManager[IO] = new TableManager[IO] { - def addColumns(columns: Vector[Field]): IO[Unit] = - state.update(_ :+ AlterTableAddedColumns(columns.map(_.name))) + private def testTableManager(state: Ref[IO, Vector[Action]]): TableManager.WithHandledErrors[IO] = + new TableManager.WithHandledErrors[IO] { + def addColumns(columns: Vector[Field]): IO[Unit] = + state.update(_ :+ AlterTableAddedColumns(columns.map(_.name))) - def createTable: IO[Unit] = - state.update(_ :+ CreatedTable) - } + def createTable: IO[Unit] = + state.update(_ :+ CreatedTable) + } private def testSourceAndAck(inputs: List[TokenedEvents], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] = new SourceAndAck[IO] { @@ -217,6 +218,7 @@ object MockEnvironment { def retriesConfig = Config.Retries( Config.SetupErrorRetries(30.seconds), Config.TransientErrorRetries(1.second, 5), - Config.AlterTableWaitRetries(1.second) + Config.AlterTableWaitRetries(1.second), + Config.TooManyColumnsRetries(300.seconds) ) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala new file mode 100644 index 00000000..e4ea800e --- /dev/null +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/TableManagerSpec.scala @@ -0,0 +1,204 @@ +/** + * Copyright (c) 2013-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., under the terms of the Snowplow + * Limited Use License Agreement, Version 1.0 located at + * https://docs.snowplow.io/limited-use-license-1.0 BY INSTALLING, DOWNLOADING, ACCESSING, USING OR + * DISTRIBUTING ANY PORTION OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.bigquery.processing + +import cats.implicits._ +import cats.effect.{IO, Ref} +import org.specs2.Specification +import cats.effect.testing.specs2.CatsEffect +import cats.effect.testkit.TestControl + +import scala.concurrent.duration.{DurationLong, FiniteDuration} + +import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type} +import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring} +import com.snowplowanalytics.snowplow.runtime.HealthProbe +import com.snowplowanalytics.snowplow.bigquery.AppHealth.Service +import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck} + +class TableManagerSpec extends Specification with CatsEffect { + import TableManagerSpec._ + + def is = s2""" + The table manager + Add columns when told to $e1 + Retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e2 + """ + + def e1 = control().flatMap { c => + val testFields1 = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val testFields2 = Vector( + Field("f3", Type.String, Type.Nullability.Nullable, Set("f3")), + Field("f4", Type.Integer, Type.Nullability.Required, Set("f4")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields1) + _ <- tableManager.addColumns(testFields2) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields1), + Action.AddColumnsAttempted(testFields2) + ) + + for { + _ <- io + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beHealthy + ).reduce(_ and _) + } + + def e2 = { + val mocks = List.fill(100)(Response.ExceptionThrown(new RuntimeException("Boom!"))) + control(Mocks(addColumnsResults = mocks)).flatMap { c => + val testFields = Vector( + Field("f1", Type.String, Type.Nullability.Nullable, Set("f1")), + Field("f2", Type.Integer, Type.Nullability.Required, Set("f2")) + ) + + val io = for { + tableManager <- TableManager.withHandledErrors(c.tableManager, retriesConfig, c.appHealth, c.monitoring) + _ <- tableManager.addColumns(testFields) + } yield () + + val expected = Vector( + Action.AddColumnsAttempted(testFields), + Action.AddColumnsAttempted(testFields), + Action.AddColumnsAttempted(testFields), + Action.AddColumnsAttempted(testFields), + Action.AddColumnsAttempted(testFields) + ) + + val test = for { + _ <- io.voidError + state <- c.state.get + health <- c.appHealth.status + } yield List( + state should beEqualTo(expected), + health should beUnhealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + + /** Convenience matchers for health probe * */ + + def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => + val result = status match { + case HealthProbe.Healthy => true + case HealthProbe.Unhealthy(_) => false + } + (result, s"$status is not healthy") + } + + def beUnhealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => + val result = status match { + case HealthProbe.Healthy => false + case HealthProbe.Unhealthy(_) => true + } + (result, s"$status is not unhealthy") + } +} + +object TableManagerSpec { + + sealed trait Action + + object Action { + case class AddColumnsAttempted(columns: Vector[Field]) extends Action + case class SentAlert(timeSentSeconds: Long) extends Action + } + + sealed trait Response[+A] + object Response { + final case class Success[A](value: A) extends Response[A] + final case class ExceptionThrown(value: Throwable) extends Response[Nothing] + } + + case class Mocks(addColumnsResults: List[Response[TableManager.AddColumnsResult]]) + + case class Control( + state: Ref[IO, Vector[Action]], + tableManager: TableManager[IO], + appHealth: AppHealth[IO], + monitoring: Monitoring[IO] + ) + + def retriesConfig = Config.Retries( + Config.SetupErrorRetries(30.seconds), + Config.TransientErrorRetries(1.second, 5), + Config.AlterTableWaitRetries(1.second), + Config.TooManyColumnsRetries(300.seconds) + ) + + def control(mocks: Mocks = Mocks(Nil)): IO[Control] = + for { + state <- Ref[IO].of(Vector.empty[Action]) + appHealth <- testAppHealth() + tableManager <- testTableManager(state, mocks.addColumnsResults) + } yield Control(state, tableManager, appHealth, testMonitoring(state)) + + private def testAppHealth(): IO[AppHealth[IO]] = { + val everythingHealthy: Map[AppHealth.Service, Boolean] = Map(Service.BigQueryClient -> true, Service.BadSink -> true) + val healthySource = new SourceAndAck[IO] { + override def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): fs2.Stream[IO, Nothing] = + fs2.Stream.empty + + override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = + IO(SourceAndAck.Healthy) + } + AppHealth.init(10.seconds, healthySource, everythingHealthy) + } + + private def testTableManager(state: Ref[IO, Vector[Action]], mocks: List[Response[TableManager.AddColumnsResult]]): IO[TableManager[IO]] = + for { + mocksRef <- Ref[IO].of(mocks) + } yield new TableManager[IO] { + def addColumns(columns: Vector[Field]): IO[TableManager.AddColumnsResult] = + for { + response <- mocksRef.modify { + case head :: tail => (tail, head) + case Nil => (Nil, Response.Success(TableManager.AddColumnsResult.Success)) + } + _ <- state.update(_ :+ Action.AddColumnsAttempted(columns)) + result <- response match { + case success: Response.Success[TableManager.AddColumnsResult] => + IO.pure(success.value) + case Response.ExceptionThrown(ex) => + IO.raiseError(ex).adaptError { t => + t.setStackTrace(Array()) // don't clutter our test logs + t + } + } + } yield result + + def createTable: IO[Unit] = + IO.unit + + } + + private def testMonitoring(state: Ref[IO, Vector[Action]]): Monitoring[IO] = new Monitoring[IO] { + def alert(message: Alert): IO[Unit] = + for { + now <- IO.realTime + _ <- state.update(_ :+ Action.SentAlert(now.toSeconds)) + } yield () + } + +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala index 122a1aad..4203dc72 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.bigquery/processing/WriterProviderSpec.scala @@ -318,7 +318,8 @@ object WriterProviderSpec { def retriesConfig = Config.Retries( Config.SetupErrorRetries(30.seconds), Config.TransientErrorRetries(1.second, 5), - Config.AlterTableWaitRetries(1.second) + Config.AlterTableWaitRetries(1.second), + Config.TooManyColumnsRetries(300.seconds) ) def control: IO[Control] = diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala index 935f7fa6..401d6371 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KafkaConfigSpec.scala @@ -102,7 +102,8 @@ object KafkaConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false, @@ -170,7 +171,8 @@ object KafkaConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false, diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala index f69708c2..eb4b2ff9 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/bigquery/KinesisConfigSpec.scala @@ -100,7 +100,8 @@ object KinesisConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false, @@ -165,7 +166,8 @@ object KinesisConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false, diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala index 4b7925fd..dcfdf78b 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/bigquery/PubsubConfigSpec.scala @@ -97,7 +97,8 @@ object PubsubConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false, @@ -158,7 +159,8 @@ object PubsubConfigSpec { retries = Config.Retries( setupErrors = Config.SetupErrorRetries(delay = 30.seconds), transientErrors = Config.TransientErrorRetries(delay = 1.second, attempts = 5), - alterTableWait = Config.AlterTableWaitRetries(delay = 1.second) + alterTableWait = Config.AlterTableWaitRetries(delay = 1.second), + tooManyColumns = Config.TooManyColumnsRetries(delay = 300.seconds) ), telemetry = Telemetry.Config( disable = false,