Skip to content

Commit

Permalink
Stay healthy if BigQuery table exceeds column limit
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Apr 1, 2024
1 parent b16153f commit ca4c710
Show file tree
Hide file tree
Showing 18 changed files with 591 additions and 89 deletions.
8 changes: 8 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
8 changes: 8 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
8 changes: 8 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@
"alterTableWait": {
"delay": "1 second"
}

# -- Relevant when the BigQuery table is close to exceeding the limit on max allowed columns in a single table.
# -- The loader will ignore a failure to alter the table due to too many columns, and it will continue to run.
# -- Some events will inevitably go to the failed events output topic until new columns have been added.
# -- This param configures how often the loader will retry to alter the table after an earlier failure.
"tooManyColumns": {
"delay": "300 seconds"
}
}

# -- Schemas that won't be loaded to BigQuery. Optional, default value []
Expand Down
3 changes: 3 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"alterTableWait": {
"delay": "1 second"
}
"tooManyColumns": {
"delay": "300 seconds"
}
}

"skipSchemas": []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,26 @@ object Alert {
}

private implicit def throwableShow: Show[Throwable] = {
def go(acc: List[String], next: Throwable): String = {
val nextMessage = Option(next.getMessage)
val msgs = nextMessage.filterNot(msg => acc.headOption.contains(msg)) ++: acc
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

Option(next.getCause) match {
case Some(cause) => go(msgs, cause)
case None => msgs.reverse.mkString(": ")
def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = Option(t.getMessage)
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show(go(Nil, _))
Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ object Config {
case class SetupErrorRetries(delay: FiniteDuration)
case class AlterTableWaitRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)
case class TooManyColumnsRetries(delay: FiniteDuration)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries,
alterTableWait: AlterTableWaitRetries
alterTableWait: AlterTableWaitRetries,
tooManyColumns: TooManyColumnsRetries
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand Down Expand Up @@ -119,6 +121,7 @@ object Config {
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val alterTableRetries = deriveConfiguredDecoder[AlterTableWaitRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val tooManyColsRetries = deriveConfiguredDecoder[TooManyColumnsRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

// TODO add bigquery docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ case class Environment[F[_]](
badSink: Sink[F],
resolver: Resolver[F],
httpClient: Client[F],
tableManager: TableManager[F],
tableManager: TableManager.WithHandledErrors[F],
writer: Writer.Provider[F],
metrics: Metrics[F],
appHealth: AppHealth[F],
Expand Down Expand Up @@ -62,7 +62,8 @@ object Environment {
badSink <- toSink(config.main.output.bad.sink)
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
creds <- Resource.eval(BigQueryUtils.credentials(config.main.output.good))
tableManager <- Resource.eval(TableManager.make(config.main.output.good, config.main.retries, creds, appHealth, monitoring))
tableManager <- Resource.eval(TableManager.make(config.main.output.good, creds))
tableManagerWrapped <- Resource.eval(TableManager.withHandledErrors(tableManager, config.main.retries, appHealth, monitoring))
writerBuilder <- Writer.builder(config.main.output.good, creds)
writerProvider <- Writer.provider(writerBuilder, config.main.retries, appHealth, monitoring)
} yield Environment(
Expand All @@ -71,7 +72,7 @@ object Environment {
badSink = badSink,
resolver = resolver,
httpClient = httpClient,
tableManager = tableManager,
tableManager = tableManagerWrapped,
writer = writerProvider,
metrics = metrics,
appHealth = appHealth,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import cats.Applicative
import cats.effect.Sync
import cats.implicits._
import com.google.api.gax.rpc.PermissionDeniedException
import com.google.cloud.bigquery.BigQueryException
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry._
import retry.implicits.retrySyntaxError

import com.snowplowanalytics.snowplow.bigquery.{Alert, AppHealth, Config, Monitoring}
import com.snowplowanalytics.snowplow.bigquery.processing.BigQueryUtils.BQExceptionSyntax

object BigQueryRetrying {

Expand Down Expand Up @@ -54,7 +56,7 @@ object BigQueryRetrying {
)

private def isSetupError[F[_]: Sync](t: Throwable): F[Boolean] = t match {
case BigQueryUtils.BQExceptionWithLowerCaseReason("notfound" | "accessdenied") =>
case bqe: BigQueryException if Set("notfound", "accessdenied").contains(bqe.lowerCaseReason) =>
true.pure[F]
case _: PermissionDeniedException =>
true.pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import scala.jdk.CollectionConverters._

object BigQuerySchemaUtils {

def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Boolean =
ddlFields.exists { field =>
def alterTableRequired(tableDescriptor: Descriptors.Descriptor, ddlFields: Seq[Field]): Seq[Field] =
ddlFields.filter { field =>
Option(tableDescriptor.findFieldByName(field.name)) match {
case Some(fieldDescriptor) =>
val nullableMismatch = fieldDescriptor.isRequired && field.nullability.nullable
Expand All @@ -32,7 +32,7 @@ object BigQuerySchemaUtils {
case Descriptors.FieldDescriptor.Type.MESSAGE =>
ddlField.fieldType match {
case Type.Struct(nestedFields) =>
alterTableRequired(tableField.getMessageType, nestedFields)
alterTableRequired(tableField.getMessageType, nestedFields).nonEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ object BigQueryUtils {
def streamIdOf(config: Config.BigQuery): String =
tableIdOf(config).getIAMResourceName + "/streams/_default"

object BQExceptionWithLowerCaseReason {
def unapply(bqe: BigQueryException): Option[(String)] =
Option(bqe.getError()) match {
case Some(bqError) =>
Some(bqError.getReason.toLowerCase)
case None =>
None
}
implicit class BQExceptionSyntax(val bqe: BigQueryException) extends AnyVal {
def lowerCaseReason: String =
Option(bqe.getError())
.flatMap(e => Option(e.getReason))
.map(_.toLowerCase)
.getOrElse("")

def lowerCaseMessage: String =
Option(bqe.getError())
.flatMap(e => Option(e.getReason))
.map(_.toLowerCase)
.getOrElse("")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ object Processing {
val fields = batch.entities.fields.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
if (BigQuerySchemaUtils.alterTableRequired(descriptor, fields)) {
env.tableManager.addColumns(fields) *> env.writer.closed.use_
val fieldsToAdd = BigQuerySchemaUtils.alterTableRequired(descriptor, fields)
if (fieldsToAdd.nonEmpty) {
env.tableManager.addColumns(fieldsToAdd.toVector) *> env.writer.closed.use_
} else {
Sync[F].unit
}
Expand Down
Loading

0 comments on commit ca4c710

Please sign in to comment.