diff --git a/build.sbt b/build.sbt index 07ee464..b128aff 100755 --- a/build.sbt +++ b/build.sbt @@ -47,7 +47,7 @@ lazy val aws: Project = project .in(file("modules/aws")) .settings(BuildSettings.awsSettings) .settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.icebergDeltaRuntimeDependencies) - .dependsOn(core) + .dependsOn(core, deltaIceberg) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) /** Packaging: Extra runtime dependencies for alternative assets * */ @@ -64,6 +64,11 @@ lazy val biglake: Project = project .settings(BuildSettings.commonSettings ++ BuildSettings.biglakeSettings) .settings(libraryDependencies ++= Dependencies.biglakeDependencies) +lazy val deltaIceberg: Project = project + .in(file("packaging/delta-iceberg")) + .settings(BuildSettings.commonSettings) + .settings(libraryDependencies ++= Dependencies.icebergDeltaRuntimeDependencies) + /** * Packaging: Alternative assets * @@ -79,7 +84,7 @@ lazy val awsHudi: Project = project .settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.hudiAwsDependencies) .dependsOn(core) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) - .dependsOn(hudi % "runtime->runtime") + .dependsOn(hudi % "runtime->runtime;compile->compile") lazy val gcpHudi: Project = project .in(file("modules/gcp")) diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 093c673..f75409e 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -181,6 +181,24 @@ "writerParallelismFraction": 0.5 } + # Retry configuration for lake operation failures + "retries": { + + # -- Configures exponential backoff on errors related to how lake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } + } + # -- Schemas that won't be loaded to the lake. Optional, default value [] "skipSchemas": [ "iglu:com.acme/skipped1/jsonschema/1-0-0" @@ -231,6 +249,16 @@ } } + # -- Report alerts to the webhook + "webhook": { + # An actual HTTP endpoint + "endpoint": "https://webhook.acme.com", + # Set of arbitrary key-value pairs attached to the payload + "tags": { + "pipeline": "production" + } + } + # -- Open a HTTP server that returns OK only if the app is healthy "healthProbe": { "port": 8000 diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 35132e8..50bd3f1 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -152,6 +152,24 @@ "writerParallelismFraction": 0.5 } + # Retry configuration for lake operation failures + "retries": { + + # -- Configures exponential backoff on errors related to how lake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } + } + # -- Schemas that won't be loaded to the lake. Optional, default value [] "skipSchemas": [ "iglu:com.acme/skipped1/jsonschema/1-0-0" @@ -202,6 +220,16 @@ } } + # -- Report alerts to the webhook + "webhook": { + # An actual HTTP endpoint + "endpoint": "https://webhook.acme.com", + # Set of arbitrary key-value pairs attached to the payload + "tags": { + "pipeline": "production" + } + } + # -- Open a HTTP server that returns OK only if the app is healthy "healthProbe": { "port": 8000 diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index c77b279..745e3aa 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -160,6 +160,24 @@ "writerParallelismFraction": 0.5 } + # Retry configuration for lake operation failures + "retries": { + + # -- Configures exponential backoff on errors related to how lake is set up for this loader. + # -- Examples include authentication errors and permissions errors. + # -- This class of errors are reported periodically to the monitoring webhook. + "setupErrors": { + "delay": "30 seconds" + } + + # -- Configures exponential backoff errors that are likely to be transient. + # -- Examples include server errors and network errors + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } + } + # -- Schemas that won't be loaded to the lake. Optional, default value [] "skipSchemas": [ "iglu:com.acme/skipped1/jsonschema/1-0-0" @@ -210,6 +228,16 @@ } } + # -- Report alerts to the webhook + "webhook": { + # An actual HTTP endpoint + "endpoint": "https://webhook.acme.com", + # Set of arbitrary key-value pairs attached to the payload + "tags": { + "pipeline": "production" + } + } + # -- Open a HTTP server that returns OK only if the app is healthy "healthProbe": { "port": 8000 diff --git a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala index 6b7fc32..c2f0cc5 100644 --- a/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala +++ b/modules/aws/src/main/scala/com.snowplowanalytics.snowplow.lakes/AwsApp.scala @@ -10,6 +10,10 @@ package com.snowplowanalytics.snowplow.lakes +import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Exception} +import software.amazon.awssdk.services.sts.model.StsException +import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException} + import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig} import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig} @@ -18,4 +22,20 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf override def source: SourceProvider = KinesisSource.build(_) override def badSink: SinkProvider = KinesisSink.resource(_) + + override def isDestinationSetupError: DestinationSetupErrorCheck = { + case _: NoSuchBucketException => + // S3 bucket does not exist + true + case e: S3Exception if e.statusCode() >= 400 && e.statusCode() < 500 => + // No permission to read from S3 bucket or to write to S3 bucket + true + case _: GlueAccessDeniedException => + // No permission to read from Glue catalog + true + case _: StsException => + // No permission to assume the role given to authenticate to S3/Glue + true + case t => TableFormatSetupError.check(t) + } } diff --git a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala index 9722821..e305e79 100644 --- a/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala +++ b/modules/azure/src/main/scala/com.snowplowanalytics.snowplow.lakes/AzureApp.scala @@ -34,4 +34,6 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo) override def source: SourceProvider = KafkaSource.build(_, classTag[SourceAuthHandler]) override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler]) + + override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false } diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 5daf914..1c9a27a 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -127,6 +127,16 @@ "writerParallelismFraction": 0.5 } + "retries": { + "setupErrors": { + "delay": "30 seconds" + } + "transientErrors": { + "delay": "1 second" + "attempts": 5 + } + } + "skipSchemas": [] "respectIgluNullability": true diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala new file mode 100644 index 0000000..00a40b1 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Alert.scala @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.lakes + +import cats.Show +import cats.implicits.showInterpolator + +import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.runtime.AppInfo + +import io.circe.Json +import io.circe.syntax.EncoderOps + +import java.sql.SQLException + +sealed trait Alert +object Alert { + + /** Restrict the length of an alert message to be compliant with alert iglu schema */ + private val MaxAlertPayloadLength = 4096 + + final case class FailedToCreateEventsTable(cause: Throwable) extends Alert + + def toSelfDescribingJson( + alert: Alert, + appInfo: AppInfo, + tags: Map[String, String] + ): Json = + SelfDescribingData( + schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)), + data = Json.obj( + "appName" -> appInfo.name.asJson, + "appVersion" -> appInfo.version.asJson, + "message" -> getMessage(alert).asJson, + "tags" -> tags.asJson + ) + ).normalize + + private def getMessage(alert: Alert): String = { + val full = alert match { + case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause" + } + + full.take(MaxAlertPayloadLength) + } + + private implicit def throwableShow: Show[Throwable] = { + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } + + def accumulateMessages(t: Throwable): List[String] = { + val nextMessage = t match { + case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}") + case t => Option(t.getMessage) + } + Option(t.getCause) match { + case Some(cause) => nextMessage.toList ::: accumulateMessages(cause) + case None => nextMessage.toList + } + } + + Show.show { t => + removeDuplicateMessages(accumulateMessages(t)).mkString(": ") + } + } +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index b441ce0..812b47d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -11,12 +11,14 @@ package com.snowplowanalytics.snowplow.lakes import cats.Id +import cats.syntax.either._ import io.circe.Decoder import io.circe.generic.extras.semiauto._ import io.circe.generic.extras.Configuration import io.circe.config.syntax._ import com.comcast.ip4s.Port +import org.http4s.{ParseFailure, Uri} import java.net.URI import scala.concurrent.duration.FiniteDuration @@ -38,7 +40,8 @@ case class Config[+Source, +Sink]( monitoring: Config.Monitoring, license: AcceptedLicense, skipSchemas: List[SchemaCriterion], - respectIgluNullability: Boolean + respectIgluNullability: Boolean, + retries: Config.Retries ) object Config { @@ -115,7 +118,18 @@ object Config { case class Monitoring( metrics: Metrics, sentry: Option[Sentry], - healthProbe: HealthProbe + healthProbe: HealthProbe, + webhook: Option[Webhook] + ) + + final case class Webhook(endpoint: Uri, tags: Map[String, String]) + + case class SetupErrorRetries(delay: FiniteDuration) + case class TransientErrorRetries(delay: FiniteDuration, attempts: Int) + + case class Retries( + setupErrors: SetupErrorRetries, + transientErrors: TransientErrorRetries ) implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { @@ -136,9 +150,15 @@ object Config { case SentryM(None, _) => None } + implicit val http4sUriDecoder: Decoder[Uri] = + Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString)) + implicit val webhookDecoder = deriveConfiguredDecoder[Webhook] implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] + implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries] + implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries] + implicit val retriesDecoder = deriveConfiguredDecoder[Retries] // TODO add specific lake-loader docs for license implicit val licenseDecoder = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index 026a5f5..bcf529e 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -62,7 +62,8 @@ object Environment { config: Config.WithIglu[SourceConfig, SinkConfig], appInfo: AppInfo, toSource: SourceConfig => F[SourceAndAck[F]], - toSink: SinkConfig => Resource[F, Sink[F]] + toSink: SinkConfig => Resource[F, Sink[F]], + destinationSetupErrorCheck: DestinationSetupErrorCheck ): Resource[F, Environment[F]] = for { _ <- enableSentry[F](appInfo, config.main.monitoring.sentry) @@ -71,10 +72,11 @@ object Environment { _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth.status) resolver <- mkResolver[F](config.iglu) httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource + monitoring <- Monitoring.create[F](config.main.monitoring.webhook, appInfo, httpClient) badSink <- toSink(config.main.output.bad.sink).evalTap(_ => appHealth.setServiceHealth(AppHealth.Service.BadSink, true)) windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows)) lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good) - lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth) + lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, monitoring, config.main.retries, destinationSetupErrorCheck) metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics)) cpuParallelism = chooseCpuParallelism(config.main) } yield Environment( diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/LoaderApp.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/LoaderApp.scala index 3e599e4..0af8567 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/LoaderApp.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/LoaderApp.scala @@ -41,8 +41,9 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder]( def source: SourceProvider def badSink: SinkProvider + def isDestinationSetupError: DestinationSetupErrorCheck - final def main: Opts[IO[ExitCode]] = Run.fromCli(info, source, badSink) + final def main: Opts[IO[ExitCode]] = Run.fromCli(info, source, badSink, isDestinationSetupError) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala new file mode 100644 index 0000000..d0f0620 --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Monitoring.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.lakes + +import cats.effect.{Resource, Sync} +import cats.implicits._ + +import com.snowplowanalytics.snowplow.runtime.AppInfo + +import org.http4s.circe.jsonEncoder +import org.http4s.client.Client +import org.http4s.{EntityDecoder, Method, Request} + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +trait Monitoring[F[_]] { + def alert(message: Alert): F[Unit] +} + +object Monitoring { + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + def create[F[_]: Sync]( + config: Option[Config.Webhook], + appInfo: AppInfo, + httpClient: Client[F] + )(implicit E: EntityDecoder[F, String] + ): Resource[F, Monitoring[F]] = Resource.pure { + new Monitoring[F] { + + override def alert(message: Alert): F[Unit] = + config match { + case Some(webhookConfig) => + val request = buildHttpRequest(webhookConfig, message) + Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *> + executeHttpRequest(webhookConfig, httpClient, request) + case None => + Logger[F].debug("Webhook monitoring is not configured, skipping alert") + } + + def buildHttpRequest(webhookConfig: Config.Webhook, alert: Alert): Request[F] = + Request[F](Method.POST, webhookConfig.endpoint) + .withEntity(Alert.toSelfDescribingJson(alert, appInfo, webhookConfig.tags)) + + def executeHttpRequest( + webhookConfig: Config.Webhook, + httpClient: Client[F], + request: Request[F] + ): F[Unit] = + httpClient + .run(request) + .use { response => + if (response.status.isSuccess) Sync[F].unit + else { + response + .as[String] + .flatMap(body => Logger[F].error(show"Webhook ${webhookConfig.endpoint} returned non-2xx response:\n$body")) + } + } + .handleErrorWith { e => + Logger[F].error(e)(show"Webhook ${webhookConfig.endpoint} resulted in exception without a response") + } + } + } + +} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala index c7191dc..08451ea 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Run.scala @@ -35,12 +35,13 @@ object Run { def fromCli[F[_]: Async, SourceConfig: Decoder, SinkConfig: Decoder]( appInfo: AppInfo, toSource: SourceConfig => F[SourceAndAck[F]], - toBadSink: SinkConfig => Resource[F, Sink[F]] + toBadSink: SinkConfig => Resource[F, Sink[F]], + destinationSetupErrorCheck: DestinationSetupErrorCheck ): Opts[F[ExitCode]] = { val configPathOpt = Opts.option[Path]("config", help = "path to config file") val igluPathOpt = Opts.option[Path]("iglu-config", help = "path to iglu resolver config file") (configPathOpt, igluPathOpt).mapN { case (configPath, igluPath) => - fromConfigPaths(appInfo, toSource, toBadSink, configPath, igluPath) + fromConfigPaths(appInfo, toSource, toBadSink, destinationSetupErrorCheck, configPath, igluPath) .race(waitForSignal) .map(_.merge) } @@ -50,6 +51,7 @@ object Run { appInfo: AppInfo, toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]], + destinationSetupErrorCheck: DestinationSetupErrorCheck, pathToConfig: Path, pathToResolver: Path ): F[ExitCode] = { @@ -58,7 +60,7 @@ object Run { config <- ConfigParser.configFromFile[F, Config[SourceConfig, SinkConfig]](pathToConfig) resolver <- ConfigParser.igluResolverFromFile(pathToResolver) configWithIglu = Config.WithIglu(config, resolver) - _ <- EitherT.right[String](fromConfig(appInfo, toSource, toBadSink, configWithIglu)) + _ <- EitherT.right[String](fromConfig(appInfo, toSource, toBadSink, destinationSetupErrorCheck, configWithIglu)) } yield ExitCode.Success eitherT @@ -76,9 +78,10 @@ object Run { appInfo: AppInfo, toSource: SourceConfig => F[SourceAndAck[F]], toBadSink: SinkConfig => Resource[F, Sink[F]], + destinationSetupErrorCheck: DestinationSetupErrorCheck, config: Config.WithIglu[SourceConfig, SinkConfig] ): F[ExitCode] = - Environment.fromConfig(config, appInfo, toSource, toBadSink).use { env => + Environment.fromConfig(config, appInfo, toSource, toBadSink, destinationSetupErrorCheck).use { env => Processing .stream(env) .concurrently(Telemetry.stream(config.main.telemetry, env.appInfo, env.httpClient)) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala index 8a480ff..609aea7 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/package.scala @@ -12,4 +12,8 @@ package com.snowplowanalytics.snowplow package object lakes { type AnyConfig = Config[Any, Any] + + // Type for the function that checks whether given exception + // is one of the destination setup errors + type DestinationSetupErrorCheck = Throwable => Boolean } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala index b2d79e1..d789464 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala @@ -17,7 +17,7 @@ import cats.effect.std.Mutex import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType -import com.snowplowanalytics.snowplow.lakes.{AppHealth, Config} +import com.snowplowanalytics.snowplow.lakes.{Alert, AppHealth, Config, DestinationSetupErrorCheck, Monitoring} import com.snowplowanalytics.snowplow.lakes.tables.{DeltaWriter, HudiWriter, IcebergWriter, Writer} trait LakeWriter[F[_]] { @@ -86,9 +86,17 @@ object LakeWriter { } yield impl(session, w, writerParallelism, mutex1, mutex2) } - def withHandledErrors[F[_]: Sync](underlying: LakeWriter[F], appHealth: AppHealth[F]): WithHandledErrors[F] = new WithHandledErrors[F] { + def withHandledErrors[F[_]: Async]( + underlying: LakeWriter[F], + appHealth: AppHealth[F], + monitoring: Monitoring[F], + retries: Config.Retries, + destinationSetupErrorCheck: DestinationSetupErrorCheck + ): WithHandledErrors[F] = new WithHandledErrors[F] { def createTable: F[Unit] = - underlying.createTable <* appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true) + Retrying.withRetries(appHealth, retries, monitoring, Alert.FailedToCreateEventsTable, destinationSetupErrorCheck) { + underlying.createTable + } def initializeLocalDataFrame(viewName: String): F[Unit] = underlying.initializeLocalDataFrame(viewName) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala new file mode 100644 index 0000000..04f17db --- /dev/null +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Retrying.scala @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.lakes.processing + +import cats.Applicative +import cats.effect.Sync +import cats.implicits._ + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import retry._ +import retry.implicits.retrySyntaxError + +import com.snowplowanalytics.snowplow.lakes.{Alert, AppHealth, Config, DestinationSetupErrorCheck, Monitoring} + +object Retrying { + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + def withRetries[F[_]: Sync: Sleep, A]( + appHealth: AppHealth[F], + config: Config.Retries, + monitoring: Monitoring[F], + toAlert: Throwable => Alert, + destinationSetupErrorCheck: DestinationSetupErrorCheck + )( + action: F[A] + ): F[A] = + retryUntilSuccessful(appHealth, config, monitoring, toAlert, destinationSetupErrorCheck, action) <* + appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = true) + + private def retryUntilSuccessful[F[_]: Sync: Sleep, A]( + appHealth: AppHealth[F], + config: Config.Retries, + monitoring: Monitoring[F], + toAlert: Throwable => Alert, + destinationSetupErrorCheck: DestinationSetupErrorCheck, + action: F[A] + ): F[A] = + action + .onError(_ => appHealth.setServiceHealth(AppHealth.Service.SparkWriter, isHealthy = false)) + .retryingOnSomeErrors( + isWorthRetrying = destinationSetupErrorCheck(_).pure[F], + policy = policyForSetupErrors[F](config), + onError = logErrorAndSendAlert[F](monitoring, toAlert, _, _) + ) + .retryingOnAllErrors( + policy = policyForTransientErrors[F](config), + onError = logError[F](_, _) + ) + + private def policyForSetupErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = + RetryPolicies.exponentialBackoff[F](config.setupErrors.delay) + + private def policyForTransientErrors[F[_]: Applicative](config: Config.Retries): RetryPolicy[F] = + RetryPolicies.fullJitter[F](config.transientErrors.delay).join(RetryPolicies.limitRetries(config.transientErrors.attempts - 1)) + + private def logErrorAndSendAlert[F[_]: Sync]( + monitoring: Monitoring[F], + toAlert: Throwable => Alert, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> monitoring.alert(toAlert(error)) + + private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + Logger[F].error(error)(s"Executing command failed. ${extractRetryDetails(details)}") + + private def extractRetryDetails(details: RetryDetails): String = details match { + case RetryDetails.GivingUp(totalRetries, totalDelay) => + s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" + case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => + s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds" + } +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index 082f3db..8b91436 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -36,7 +36,7 @@ object TestSparkEnvironment { appHealth <- Resource.eval(AppHealth.init(10.seconds, source)) _ <- Resource.eval(appHealth.setServiceHealth(AppHealth.Service.BadSink, isHealthy = true)) lakeWriter <- LakeWriter.build[IO](testConfig.spark, testConfig.output.good) - lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth) + lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, dummyMonitoring, retriesConfig, _ => false) } yield Environment( appInfo = appInfo, source = source, @@ -54,6 +54,15 @@ object TestSparkEnvironment { respectIgluNullability = true ) + private val retriesConfig = Config.Retries( + Config.SetupErrorRetries(30.seconds), + Config.TransientErrorRetries(1.second, 5) + ) + + private val dummyMonitoring = new Monitoring[IO] { + override def alert(message: Alert): IO[Unit] = IO.unit + } + private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] = new SourceAndAck[IO] { def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] = diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala index 55d9678..79122c6 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriterSpec.scala @@ -13,13 +13,14 @@ package com.snowplowanalytics.snowplow.lakes.processing import cats.implicits._ import cats.data.NonEmptyList import cats.effect.{IO, Ref} +import cats.effect.testkit.TestControl import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import org.apache.spark.sql.types.StructType import org.apache.spark.sql.Row import com.snowplowanalytics.snowplow.runtime.HealthProbe -import com.snowplowanalytics.snowplow.lakes.AppHealth +import com.snowplowanalytics.snowplow.lakes._ import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck} import scala.concurrent.duration.{DurationLong, FiniteDuration} @@ -30,9 +31,12 @@ class LakeWriterSpec extends Specification with CatsEffect { def is = s2""" The lake writer should: become healthy after creating the table $e1 - stay unhealthy after failure to create the table $e2 - become healthy after committing to the lake $e3 - become unhealthy after failure to commit to the lake $e4 + retry adding columns and send alerts when there is a setup exception $e2 + retry adding columns if there is a transient exception, with limited number of attempts and no monitoring alerts $e3 + become healthy after recovering from an earlier setup error $e4 + become healthy after recovering from an earlier transient error $e5 + become healthy after committing to the lake $e6 + become unhealthy after failure to commit to the lake $e7 """ def e1 = @@ -41,7 +45,13 @@ class LakeWriterSpec extends Specification with CatsEffect { Action.CreateTableAttempted ) - val wrappedLakeWriter = LakeWriter.withHandledErrors(c.lakeWriter, c.appHealth) + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + dummyDestinationSetupErrorCheck + ) for { healthBefore <- c.appHealth.status @@ -56,35 +66,154 @@ class LakeWriterSpec extends Specification with CatsEffect { } def e2 = { + val mocks = Mocks(List.fill(100)(Response.ExceptionThrown(new RuntimeException("boom!")))) + control(mocks).flatMap { c => + val expected = Vector( + Action.CreateTableAttempted, + Action.SentAlert(0L), + Action.CreateTableAttempted, + Action.SentAlert(30L), + Action.CreateTableAttempted, + Action.SentAlert(90L), + Action.CreateTableAttempted, + Action.SentAlert(210L) + ) + + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + _ => true + ) + + val test = for { + healthBefore <- c.appHealth.status + fiber <- wrappedLakeWriter.createTable.voidError.start + _ <- IO.sleep(4.minutes) + _ <- fiber.cancel + healthAfter <- c.appHealth.status + state <- c.state.get + } yield List( + state should beEqualTo(expected), + healthBefore should beUnhealthy, + healthAfter should beUnhealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + + def e3 = { + val mocks = Mocks(List.fill(100)(Response.ExceptionThrown(new RuntimeException("boom!")))) + control(mocks).flatMap { c => + val expected = Vector( + Action.CreateTableAttempted, + Action.CreateTableAttempted, + Action.CreateTableAttempted, + Action.CreateTableAttempted, + Action.CreateTableAttempted + ) + + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + _ => false + ) + + val test = for { + healthBefore <- c.appHealth.status + _ <- wrappedLakeWriter.createTable.voidError + healthAfter <- c.appHealth.status + state <- c.state.get + } yield List( + state should beEqualTo(expected), + healthBefore should beUnhealthy, + healthAfter should beUnhealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } + } + + def e4 = { val mocks = Mocks(List(Response.ExceptionThrown(new RuntimeException("boom!")))) control(mocks).flatMap { c => val expected = Vector( + Action.CreateTableAttempted, + Action.SentAlert(0L), Action.CreateTableAttempted ) - val wrappedLakeWriter = LakeWriter.withHandledErrors(c.lakeWriter, c.appHealth) + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + _ => true + ) - for { + val test = for { healthBefore <- c.appHealth.status _ <- wrappedLakeWriter.createTable.voidError healthAfter <- c.appHealth.status state <- c.state.get } yield List( state should beEqualTo(expected), - healthBefore should beAnInstanceOf[HealthProbe.Unhealthy], - healthAfter should beAnInstanceOf[HealthProbe.Unhealthy] + healthBefore should beUnhealthy, + healthAfter should beHealthy ).reduce(_ and _) + + TestControl.executeEmbed(test) } + } + + def e5 = { + val mocks = Mocks(List(Response.ExceptionThrown(new RuntimeException("boom!")))) + control(mocks).flatMap { c => + val expected = Vector( + Action.CreateTableAttempted, + Action.CreateTableAttempted + ) + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + _ => false + ) + + val test = for { + healthBefore <- c.appHealth.status + _ <- wrappedLakeWriter.createTable.voidError + healthAfter <- c.appHealth.status + state <- c.state.get + } yield List( + state should beEqualTo(expected), + healthBefore should beUnhealthy, + healthAfter should beHealthy + ).reduce(_ and _) + + TestControl.executeEmbed(test) + } } - def e3 = + def e6 = control().flatMap { c => val expected = Vector( Action.CommitAttempted("testview") ) - val wrappedLakeWriter = LakeWriter.withHandledErrors(c.lakeWriter, c.appHealth) + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + dummyDestinationSetupErrorCheck + ) for { healthBefore <- c.appHealth.status @@ -98,7 +227,7 @@ class LakeWriterSpec extends Specification with CatsEffect { ).reduce(_ and _) } - def e4 = { + def e7 = { val mocks = Mocks(List(Response.Success, Response.ExceptionThrown(new RuntimeException("boom!")))) control(mocks).flatMap { c => @@ -107,7 +236,13 @@ class LakeWriterSpec extends Specification with CatsEffect { Action.CommitAttempted("testview2") ) - val wrappedLakeWriter = LakeWriter.withHandledErrors(c.lakeWriter, c.appHealth) + val wrappedLakeWriter = LakeWriter.withHandledErrors( + c.lakeWriter, + c.appHealth, + c.monitoring, + retriesConfig, + dummyDestinationSetupErrorCheck + ) for { _ <- wrappedLakeWriter.commit("testview1") @@ -123,6 +258,24 @@ class LakeWriterSpec extends Specification with CatsEffect { } } + /** Convenience matchers for health probe * */ + + def beHealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => + val result = status match { + case HealthProbe.Healthy => true + case HealthProbe.Unhealthy(_) => false + } + (result, s"$status is not healthy") + } + + def beUnhealthy: org.specs2.matcher.Matcher[HealthProbe.Status] = { (status: HealthProbe.Status) => + val result = status match { + case HealthProbe.Healthy => false + case HealthProbe.Unhealthy(_) => true + } + (result, s"$status is not unhealthy") + } + } object LakeWriterSpec { @@ -131,6 +284,7 @@ object LakeWriterSpec { object Action { case object CreateTableAttempted extends Action case class CommitAttempted(viewName: String) extends Action + case class SentAlert(timeSentSeconds: Long) extends Action } sealed trait Response @@ -144,7 +298,13 @@ object LakeWriterSpec { case class Control( state: Ref[IO, Vector[Action]], lakeWriter: LakeWriter[IO], - appHealth: AppHealth[IO] + appHealth: AppHealth[IO], + monitoring: Monitoring[IO] + ) + + val retriesConfig = Config.Retries( + Config.SetupErrorRetries(30.seconds), + Config.TransientErrorRetries(1.second, 5) ) def control(mocks: Mocks = Mocks(Nil)): IO[Control] = @@ -152,7 +312,7 @@ object LakeWriterSpec { state <- Ref[IO].of(Vector.empty[Action]) appHealth <- testAppHealth tableManager <- testLakeWriter(state, mocks.lakeWriterResults) - } yield Control(state, tableManager, appHealth) + } yield Control(state, tableManager, appHealth, testMonitoring(state)) private def testAppHealth: IO[AppHealth[IO]] = { val healthySource = new SourceAndAck[IO] { @@ -167,6 +327,16 @@ object LakeWriterSpec { } } + private def testMonitoring(state: Ref[IO, Vector[Action]]): Monitoring[IO] = new Monitoring[IO] { + def alert(message: Alert): IO[Unit] = + for { + now <- IO.realTime + _ <- state.update(_ :+ Action.SentAlert(now.toSeconds)) + } yield () + } + + private val dummyDestinationSetupErrorCheck: Throwable => Boolean = _ => false + private def testLakeWriter(state: Ref[IO, Vector[Action]], mocks: List[Response]): IO[LakeWriter[IO]] = for { mocksRef <- Ref[IO].of(mocks) diff --git a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala index 6c63fd1..5ee3bc6 100644 --- a/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala +++ b/modules/gcp/src/main/scala/com.snowplowanalytics.snowplow.lakes/GcpApp.scala @@ -18,4 +18,6 @@ object GcpApp extends LoaderApp[PubsubSourceConfig, PubsubSinkConfig](BuildInfo) override def source: SourceProvider = PubsubSource.build(_) override def badSink: SinkProvider = PubsubSink.resource(_) + + override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false } diff --git a/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala new file mode 100644 index 0000000..f455c5c --- /dev/null +++ b/packaging/delta-iceberg/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.lakes + +import org.apache.iceberg.exceptions.{ForbiddenException => IcebergForbiddenException, NotFoundException => IcebergNotFoundException} + +object TableFormatSetupError { + + // Check if given exception is specific to iceberg format + def check(t: Throwable): Boolean = + t match { + case _: IcebergNotFoundException => + // Glue catalog does not exist + true + case _: IcebergForbiddenException => + // No permission to create a table in Glue catalog + true + case _ => + false + } +} diff --git a/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala b/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala new file mode 100644 index 0000000..d9878f9 --- /dev/null +++ b/packaging/hudi/src/main/scala/com.snowplowanalytics.snowplow.lakes/TableFormatSetupError.scala @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.lakes + +object TableFormatSetupError { + + // Check if given exception is specific to hudi format + // TODO: Implement it properly + def check: Throwable => Boolean = _ => false +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 17678c9..a8d4cb0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -140,8 +140,8 @@ object Dependencies { ) val icebergDeltaRuntimeDependencies = Seq( + iceberg, delta % Runtime, - iceberg % Runtime, Spark.coreForIcebergDelta % Runtime, Spark.sqlForIcebergDelta % Runtime ) @@ -170,8 +170,8 @@ object Dependencies { kinesis, hadoopAws.exclude("software.amazon.awssdk", "bundle"), awsCore, // Dependency on aws sdk v1 will likely be removed in the next release of hadoop-aws - awsGlue % Runtime, - awsS3 % Runtime, + awsS3, + awsGlue, awsS3Transfer % Runtime, awsSts % Runtime, hadoopClient