Skip to content

Commit

Permalink
Implement alerting and retrying mechanisms
Browse files Browse the repository at this point in the history
Two features has been added in this commit: alerting and retrying

For alerting, webhook method is used similar to other Snowplow apps. Alert message is sent to URL given in the config. Alerts are sent for some error cases, not for all of them. It is implemented such that it is sent only for setup errors. The error cases where alert sent can be extended in the future, of course.

For retrying, two retry policies can be defined similar to Snowflake Loader. One of them is for setup errors and other one is for transient errors. Alert would be sent only for setup errors, not for transient errors.

Also, possible setup error cases for Iceberg/Glue/S3 are added in this commit as well. Error cases for other destinations/table formats will be added later.
  • Loading branch information
spenes committed Jul 26, 2024
1 parent 056d9a8 commit 30e9e2a
Show file tree
Hide file tree
Showing 22 changed files with 661 additions and 33 deletions.
9 changes: 7 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ lazy val aws: Project = project
.in(file("modules/aws"))
.settings(BuildSettings.awsSettings)
.settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

/** Packaging: Extra runtime dependencies for alternative assets * */
Expand All @@ -64,6 +64,11 @@ lazy val biglake: Project = project
.settings(BuildSettings.commonSettings ++ BuildSettings.biglakeSettings)
.settings(libraryDependencies ++= Dependencies.biglakeDependencies)

lazy val deltaIceberg: Project = project
.in(file("packaging/delta-iceberg"))
.settings(BuildSettings.commonSettings)
.settings(libraryDependencies ++= Dependencies.icebergDeltaRuntimeDependencies)

/**
* Packaging: Alternative assets
*
Expand All @@ -79,7 +84,7 @@ lazy val awsHudi: Project = project
.settings(libraryDependencies ++= Dependencies.awsDependencies ++ Dependencies.hudiAwsDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val gcpHudi: Project = project
.in(file("modules/gcp"))
Expand Down
28 changes: 28 additions & 0 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand Down Expand Up @@ -231,6 +249,16 @@
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
28 changes: 28 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand Down Expand Up @@ -202,6 +220,16 @@
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
28 changes: 28 additions & 0 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
"retries": {

# -- Configures exponential backoff on errors related to how lake is set up for this loader.
# -- Examples include authentication errors and permissions errors.
# -- This class of errors are reported periodically to the monitoring webhook.
"setupErrors": {
"delay": "30 seconds"
}

# -- Configures exponential backoff errors that are likely to be transient.
# -- Examples include server errors and network errors
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

# -- Schemas that won't be loaded to the lake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0"
Expand Down Expand Up @@ -210,6 +228,16 @@
}
}

# -- Report alerts to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
}

# -- Open a HTTP server that returns OK only if the app is healthy
"healthProbe": {
"port": 8000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

package com.snowplowanalytics.snowplow.lakes

import software.amazon.awssdk.services.s3.model.{NoSuchBucketException, S3Exception}
import software.amazon.awssdk.services.sts.model.StsException
import software.amazon.awssdk.services.glue.model.{AccessDeniedException => GlueAccessDeniedException}

import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig}
import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig}

Expand All @@ -18,4 +22,20 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf
override def source: SourceProvider = KinesisSource.build(_)

override def badSink: SinkProvider = KinesisSink.resource(_)

override def isDestinationSetupError: DestinationSetupErrorCheck = {
case _: NoSuchBucketException =>
// S3 bucket does not exist
true
case e: S3Exception if e.statusCode() >= 400 && e.statusCode() < 500 =>
// No permission to read from S3 bucket or to write to S3 bucket
true
case _: GlueAccessDeniedException =>
// No permission to read from Glue catalog
true
case _: StsException =>
// No permission to assume the role given to authenticate to S3/Glue
true
case t => TableFormatSetupError.check(t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)
override def source: SourceProvider = KafkaSource.build(_, classTag[SourceAuthHandler])

override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => false
}
10 changes: 10 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@
"writerParallelismFraction": 0.5
}

"retries": {
"setupErrors": {
"delay": "30 seconds"
}
"transientErrors": {
"delay": "1 second"
"attempts": 5
}
}

"skipSchemas": []
"respectIgluNullability": true

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package com.snowplowanalytics.snowplow.lakes

import cats.Show
import cats.implicits.showInterpolator

import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import io.circe.Json
import io.circe.syntax.EncoderOps

import java.sql.SQLException

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(cause: Throwable) extends Alert

def toSelfDescribingJson(
alert: Alert,
appInfo: AppInfo,
tags: Map[String, String]
): Json =
SelfDescribingData(
schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)),
data = Json.obj(
"appName" -> appInfo.name.asJson,
"appVersion" -> appInfo.version.asJson,
"message" -> getMessage(alert).asJson,
"tags" -> tags.asJson
)
).normalize

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(cause) => show"Failed to create events table: $cause"
}

full.take(MaxAlertPayloadLength)
}

private implicit def throwableShow: Show[Throwable] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}

def accumulateMessages(t: Throwable): List[String] = {
val nextMessage = t match {
case t: SQLException => Some(s"${t.getMessage} = SqlState: ${t.getSQLState}")
case t => Option(t.getMessage)
}
Option(t.getCause) match {
case Some(cause) => nextMessage.toList ::: accumulateMessages(cause)
case None => nextMessage.toList
}
}

Show.show { t =>
removeDuplicateMessages(accumulateMessages(t)).mkString(": ")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package com.snowplowanalytics.snowplow.lakes

import cats.Id
import cats.syntax.either._
import io.circe.Decoder
import io.circe.generic.extras.semiauto._
import io.circe.generic.extras.Configuration
import io.circe.config.syntax._
import com.comcast.ip4s.Port

import org.http4s.{ParseFailure, Uri}
import java.net.URI
import scala.concurrent.duration.FiniteDuration

Expand All @@ -38,7 +40,8 @@ case class Config[+Source, +Sink](
monitoring: Config.Monitoring,
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion],
respectIgluNullability: Boolean
respectIgluNullability: Boolean,
retries: Config.Retries
)

object Config {
Expand Down Expand Up @@ -115,7 +118,18 @@ object Config {
case class Monitoring(
metrics: Metrics,
sentry: Option[Sentry],
healthProbe: HealthProbe
healthProbe: HealthProbe,
webhook: Option[Webhook]
)

final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class SetupErrorRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -136,9 +150,15 @@ object Config {
case SentryM(None, _) =>
None
}
implicit val http4sUriDecoder: Decoder[Uri] =
Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString))
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

// TODO add specific lake-loader docs for license
implicit val licenseDecoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ object Environment {
config: Config.WithIglu[SourceConfig, SinkConfig],
appInfo: AppInfo,
toSource: SourceConfig => F[SourceAndAck[F]],
toSink: SinkConfig => Resource[F, Sink[F]]
toSink: SinkConfig => Resource[F, Sink[F]],
destinationSetupErrorCheck: DestinationSetupErrorCheck
): Resource[F, Environment[F]] =
for {
_ <- enableSentry[F](appInfo, config.main.monitoring.sentry)
Expand All @@ -71,10 +72,11 @@ object Environment {
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth.status)
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
monitoring <- Monitoring.create[F](config.main.monitoring.webhook, appInfo, httpClient)
badSink <- toSink(config.main.output.bad.sink).evalTap(_ => appHealth.setServiceHealth(AppHealth.Service.BadSink, true))
windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows))
lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, monitoring, config.main.retries, destinationSetupErrorCheck)
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
cpuParallelism = chooseCpuParallelism(config.main)
} yield Environment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder](

def source: SourceProvider
def badSink: SinkProvider
def isDestinationSetupError: DestinationSetupErrorCheck

final def main: Opts[IO[ExitCode]] = Run.fromCli(info, source, badSink)
final def main: Opts[IO[ExitCode]] = Run.fromCli(info, source, badSink, isDestinationSetupError)

}

Expand Down
Loading

0 comments on commit 30e9e2a

Please sign in to comment.