Skip to content

Commit

Permalink
common-streams 0.8.x with refactored health monitoring (#78)
Browse files Browse the repository at this point in the history
In common-streams 0.8.x we shifted alerting / retrying / webhook out of
the applications and into the common library.  It also adds new features
like heartbeat webhooks starting when the loader first becomes healthy.
  • Loading branch information
istreeter authored and oguzhanunlu committed Nov 1, 2024
1 parent d0d689a commit fffeeb2
Show file tree
Hide file tree
Showing 24 changed files with 190 additions and 506 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ lazy val azure: Project = project
.in(file("modules/azure"))
.settings(BuildSettings.azureSettings)
.settings(libraryDependencies ++= Dependencies.azureDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

lazy val gcp: Project = project
.in(file("modules/gcp"))
.settings(BuildSettings.gcpSettings)
.settings(libraryDependencies ++= Dependencies.gcpDependencies ++ Dependencies.icebergDeltaRuntimeDependencies)
.dependsOn(core)
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

lazy val aws: Project = project
Expand All @@ -50,7 +50,7 @@ lazy val aws: Project = project
.dependsOn(core, deltaIceberg)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)

/** Packaging: Extra runtime dependencies for alternative assets * */
/** Packaging: Extra runtime dependencies for alternative assets */

lazy val hudi: Project = project
.in(file("packaging/hudi"))
Expand Down Expand Up @@ -94,7 +94,7 @@ lazy val gcpHudi: Project = project
.settings(libraryDependencies ++= Dependencies.gcpDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val azureHudi: Project = project
.in(file("modules/azure"))
Expand All @@ -104,7 +104,7 @@ lazy val azureHudi: Project = project
.settings(libraryDependencies ++= Dependencies.azureDependencies)
.dependsOn(core)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin)
.dependsOn(hudi % "runtime->runtime")
.dependsOn(hudi % "runtime->runtime;compile->compile")

lazy val gcpBiglake: Project = gcp
.withId("gcpBiglake")
Expand Down
4 changes: 3 additions & 1 deletion config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,16 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
Expand Down
4 changes: 3 additions & 1 deletion config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,16 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
Expand Down
4 changes: 3 additions & 1 deletion config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@
}
}

# -- Report alerts to the webhook
# -- Report alerts and heartbeats to the webhook
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
# How often to send the heartbeat event
"heartbeat": "60.minutes"
}

# -- Open a HTTP server that returns OK only if the app is healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,27 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf
*/
override def isDestinationSetupError: DestinationSetupErrorCheck = {

/** Exceptions raised by underlying AWS SDK * */
// Exceptions raised by underlying AWS SDK
case _: NoSuchBucketException =>
// S3 bucket does not exist
Some("S3 bucket does not exist or we do not have permissions to see it exists")
"S3 bucket does not exist or we do not have permissions to see it exists"
case e: S3Exception if e.statusCode() === 403 =>
// No permission to read from S3 bucket or to write to S3 bucket
Some("Missing permissions to perform this action on S3 bucket")
"Missing permissions to perform this action on S3 bucket"
case e: S3Exception if e.statusCode() === 301 =>
// Misconfigured AWS region
Some("S3 bucket is not in the expected region")
"S3 bucket is not in the expected region"
case e: GlueAccessDeniedException =>
// No permission to read from Glue catalog
Some(Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog"))
Option(e.getMessage).getOrElse("Missing permissions to perform this action on Glue catalog")
case _: GlueEntityNotFoundException =>
// Glue database does not exist
Some("Glue resource does not exist or no permission to see it exists")
"Glue resource does not exist or no permission to see it exists"
case e: StsException if e.statusCode() === 403 =>
// No permission to assume the role given to authenticate to S3/Glue
Some("Missing permissions to assume the AWS IAM role")
"Missing permissions to assume the AWS IAM role"

/** Exceptions raised via hadoop's s3a filesystem * */
// Exceptions raised via hadoop's s3a filesystem
case e: UnknownStoreException =>
// S3 bucket does not exist or no permission to see it exists
stripCauseDetails(e)
Expand All @@ -70,10 +70,11 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf
// 2 - No permission to assume the role given to authenticate to S3
stripCauseDetails(e)
case _: CredentialInitializationException =>
Some("Failed to initialize AWS access credentials")
"Failed to initialize AWS access credentials"

/** Exceptions common to the table format - Delta/Iceberg/Hudi * */
case t => TableFormatSetupError.check(t)
// Exceptions common to the table format - Delta/Iceberg/Hudi
case TableFormatSetupError.check(t) =>
t
}

/**
Expand All @@ -86,13 +87,12 @@ object AwsApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInf
* In order to have better control of the message sent to the webhook, we remove the cause details
* here, and add back in pertinent cause information later.
*/
private def stripCauseDetails(t: Throwable): Option[String] =
(Option(t.getMessage), Option(t.getCause)) match {
case (Some(message), Some(cause)) =>
private def stripCauseDetails(t: Throwable): String =
Option(t.getCause) match {
case Some(cause) =>
val toRemove = new Regex(":? *" + Regex.quote(cause.toString) + ".*")
val replaced = toRemove.replaceAllIn(message, "")
Some(replaced)
case (other, _) =>
other
toRemove.replaceAllIn(t.getMessage, "")
case None =>
t.getMessage
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ object AzureApp extends LoaderApp[KafkaSourceConfig, KafkaSinkConfig](BuildInfo)

override def badSink: SinkProvider = KafkaSink.resource(_, classTag[SinkAuthHandler])

override def isDestinationSetupError: DestinationSetupErrorCheck = _ => None
override def isDestinationSetupError: DestinationSetupErrorCheck = TableFormatSetupError.check
}
1 change: 1 addition & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
"prefix": "snowplow.lakeloader"
}
}
"webhook": ${snowplow.defaults.webhook}
"sentry": {
"tags": {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,14 @@ package com.snowplowanalytics.snowplow.lakes
import cats.Show
import cats.implicits.showInterpolator

import com.snowplowanalytics.iglu.core.circe.implicits.igluNormalizeDataJson
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.runtime.AppInfo

import io.circe.Json
import io.circe.syntax.EncoderOps
import com.snowplowanalytics.snowplow.runtime.SetupExceptionMessages

sealed trait Alert
object Alert {

/** Restrict the length of an alert message to be compliant with alert iglu schema */
private val MaxAlertPayloadLength = 4096

final case class FailedToCreateEventsTable(causes: List[String]) extends Alert
final case class FailedToCommitEvents(causes: List[String]) extends Alert

def toSelfDescribingJson(
alert: Alert,
appInfo: AppInfo,
tags: Map[String, String]
): Json =
SelfDescribingData(
schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "alert", "jsonschema", SchemaVer.Full(1, 0, 0)),
data = Json.obj(
"appName" -> appInfo.name.asJson,
"appVersion" -> appInfo.version.asJson,
"message" -> getMessage(alert).asJson,
"tags" -> tags.asJson
)
).normalize

private def getMessage(alert: Alert): String = {
val full = alert match {
case FailedToCreateEventsTable(causes) => show"Failed to create events table: $causes"
case FailedToCommitEvents(causes) => show"Failed to write events into table: $causes"
}

full.take(MaxAlertPayloadLength)
}

private implicit def causesShow: Show[List[String]] = {
def removeDuplicateMessages(in: List[String]): List[String] =
in match {
case h :: t :: rest =>
if (h.contains(t)) removeDuplicateMessages(h :: rest)
else if (t.contains(h)) removeDuplicateMessages(t :: rest)
else h :: removeDuplicateMessages(t :: rest)
case fewer => fewer
}
final case class FailedToCreateEventsTable(causes: SetupExceptionMessages) extends Alert

Show.show { causes =>
removeDuplicateMessages(causes).mkString(": ")
}
implicit def showAlert: Show[Alert] = Show[Alert] { case FailedToCreateEventsTable(causes) =>
show"Failed to create events table: $causes"
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@
package com.snowplowanalytics.snowplow.lakes

import cats.Id
import cats.syntax.either._
import io.circe.Decoder
import io.circe.generic.extras.semiauto._
import io.circe.generic.extras.Configuration
import io.circe.config.syntax._
import com.comcast.ip4s.Port

import org.http4s.{ParseFailure, Uri}
import java.net.URI
import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand Down Expand Up @@ -119,17 +117,15 @@ object Config {
metrics: Metrics,
sentry: Option[Sentry],
healthProbe: HealthProbe,
webhook: Option[Webhook]
webhook: Webhook.Config
)

final case class Webhook(endpoint: Uri, tags: Map[String, String])

case class SetupErrorRetries(delay: FiniteDuration)
case class TransientErrorRetries(delay: FiniteDuration, attempts: Int)

case class Retries(
setupErrors: SetupErrorRetries,
transientErrors: TransientErrorRetries
setupErrors: Retrying.Config.ForSetup,
transientErrors: Retrying.Config.ForTransient
)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
Expand All @@ -150,14 +146,9 @@ object Config {
case SentryM(None, _) =>
None
}
implicit val http4sUriDecoder: Decoder[Uri] =
Decoder[String].emap(s => Either.catchOnly[ParseFailure](Uri.unsafeFromString(s)).leftMap(_.toString))
implicit val webhookDecoder = deriveConfiguredDecoder[Webhook]
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val setupRetries = deriveConfiguredDecoder[SetupErrorRetries]
implicit val transientRetries = deriveConfiguredDecoder[TransientErrorRetries]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]

// TODO add specific lake-loader docs for license
Expand Down
Loading

0 comments on commit fffeeb2

Please sign in to comment.