Skip to content

Commit

Permalink
Add telemetry (close #489)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 3, 2022
1 parent 82281c3 commit a6f44ab
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 83 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ lazy val commonFs2 = project
Dependencies.Libraries.pureconfig.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.pureconfigCats.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.pureconfigCirce.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.trackerCore,
Dependencies.Libraries.emitterHttps,
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.scalacheck,
Expand Down
30 changes: 30 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,34 @@
"cloudwatch": true
}
}

# Optional, configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "enrich-kinesis-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
41 changes: 41 additions & 0 deletions config/config.pubsub.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,45 @@
}

}

# Optional, configure telemetry
# All the fields are optional
"telemetry": {

# Set to true to disable telemetry
"disable": false

# Interval for the heartbeat event
"interval": 15 minutes

# HTTP method used to send the heartbeat event
"method": POST

# URI of the collector receiving the heartbeat event
"collectorUri": collector-g.snowplowanalytics.com

# Port of the collector receiving the heartbeat event
"collectorPort": 443

# Whether to use https or not
"secure": true

# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": my_pipeline

# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": hfy67e5ydhtrd

# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
instanceId = 665bhft5u6udjf

# Name of the terraform module that deployed the app
moduleName = enrich-kinesis-ce

# Version of the terraform module that deployed the app
moduleVersion = 1.0.0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import _root_.io.circe.Json

import _root_.io.sentry.{Sentry, SentryClient}

import org.http4s.client.{Client => HttpClient}

import com.snowplowanalytics.iglu.client.{Client => IgluClient}
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, RegistryLookup}

Expand All @@ -38,11 +40,12 @@ import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.BlockerF

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{ConfigFile, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Concurrency
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Telemetry => TelemetryConfig}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Clients, Metrics}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

import scala.concurrent.ExecutionContext
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.Kinesis

/**
* All allocated resources, configs and mutable variables necessary for running Enrich process
Expand All @@ -56,6 +59,7 @@ import scala.concurrent.ExecutionContext
* @param semaphore its permit is shared between enriching the events and updating the assets
* @param assetsState a main entity from [[Assets]] stream, controlling when assets
* have to be replaced with newer ones
* @param httpClient client used to perform HTTP requests
* @param blocker thread pool for blocking operations and enrichments themselves
* @param source stream of records containing the collector payloads
* @param sinkGood function that sinks enriched event
Expand All @@ -68,8 +72,11 @@ import scala.concurrent.ExecutionContext
* @param assetsUpdatePeriod time after which enrich assets should be refresh
* @param goodAttributes fields from an enriched event to use as output message attributes
* @param piiAttributes fields from a PII event to use as output message attributes
* @param telemetryConfig configuration for telemetry
* @param processor identifies enrich asset in bad rows
* @param streamsSettings parameters used to configure the streams
* @param region region in the cloud where enrich runs
* @param cloud cloud where enrich runs (AWS or GCP)
* @tparam A type emitted by the source (e.g. `ConsumerRecord` for PubSub).
* getPayload must be defined for this type, as well as checkpointing
*/
Expand All @@ -79,6 +86,7 @@ final case class Environment[F[_], A](
enrichments: Ref[F, Environment.Enrichments[F]],
semaphore: Semaphore[F],
assetsState: Assets.State[F],
httpClient: HttpClient[F],
blocker: Blocker,
source: Stream[F, A],
sinkGood: AttributedByteSink[F],
Expand All @@ -91,8 +99,11 @@ final case class Environment[F[_], A](
assetsUpdatePeriod: Option[FiniteDuration],
goodAttributes: EnrichedEvent => Map[String, String],
piiAttributes: EnrichedEvent => Map[String, String],
telemetryConfig: TelemetryConfig,
processor: Processor,
streamsSettings: Environment.StreamsSettings
streamsSettings: Environment.StreamsSettings,
region: Option[String],
cloud: Option[Telemetry.Cloud]
)

object Environment {
Expand Down Expand Up @@ -134,7 +145,9 @@ object Environment {
checkpoint: List[A] => F[Unit],
getPayload: A => Array[Byte],
processor: Processor,
maxRecordSize: Int
maxRecordSize: Int,
cloud: Option[Telemetry.Cloud],
getRegion: => Option[String]
): Resource[F, Environment[F, A]] = {
val file = parsedConfigs.configFile
for {
Expand All @@ -159,6 +172,7 @@ object Environment {
enrichments,
sem,
assetsState,
http,
blocker,
source,
good,
Expand All @@ -171,8 +185,11 @@ object Environment {
file.assetsUpdatePeriod,
parsedConfigs.goodAttributes,
parsedConfigs.piiAttributes,
file.telemetry,
processor,
StreamsSettings(file.concurrency, maxRecordSize)
StreamsSettings(file.concurrency, maxRecordSize),
getRegionFromConfig(file).orElse(getRegion),
cloud
)
}

Expand All @@ -190,4 +207,12 @@ object Environment {
}

case class StreamsSettings(concurrency: Concurrency, maxRecordSize: Int)

private def getRegionFromConfig(file: ConfigFile): Option[String] =
file.input match {
case Kinesis(_, _, region, _, _, _, _) =>
region
case _ =>
None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ object Run {
checkpoint: List[A] => F[Unit],
mkClients: List[Blocker => Client[F]],
getPayload: A => Array[Byte],
maxRecordSize: Int
maxRecordSize: Int,
cloud: Option[Telemetry.Cloud],
getRegion: => Option[String]
): F[ExitCode] =
CliConfig.command(name, version, description).parse(args) match {
case Right(cli) =>
Expand Down Expand Up @@ -93,7 +95,9 @@ object Run {
_ => Sync[F].unit,
identity,
processor,
maxRecordSize
maxRecordSize,
cloud,
getRegion
)
runEnvironment[F, Array[Byte]](env)
case _ =>
Expand All @@ -110,7 +114,9 @@ object Run {
checkpoint,
getPayload,
processor,
maxRecordSize
maxRecordSize,
cloud,
getRegion
)
runEnvironment[F, A](env)
}
Expand Down Expand Up @@ -143,8 +149,9 @@ object Run {
val log = Logger[F].info("Running enrichment stream")
val enrich = Enrich.run[F, A](env)
val updates = Assets.run[F, A](env.blocker, env.semaphore, env.assetsUpdatePeriod, env.assetsState, env.enrichments)
val telemetry = Telemetry.run[F, A](env)
val reporting = env.metrics.report
val flow = enrich.merge(updates).merge(reporting)
val flow = enrich.merge(updates).merge(reporting).merge(telemetry)
log >> flow.compile.drain.as(ExitCode.Success).recoverWith {
case exception: Throwable =>
sendToSentry(exception, env.sentry) >>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2021-2021 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import cats.data.NonEmptyList
import cats.implicits._

import cats.effect.{ConcurrentEffect, Resource, Sync, Timer}

import fs2.Stream

import org.http4s.client.{Client => HttpClient}

import _root_.io.circe.Json
import _root_.io.circe.Encoder
import _root_.io.circe.syntax._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.scalatracker.Tracker
import com.snowplowanalytics.snowplow.scalatracker.Emitter._
import com.snowplowanalytics.snowplow.scalatracker.Emitter.{Result => TrackerResult}
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Telemetry => TelemetryConfig}

object Telemetry {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run[F[_]: ConcurrentEffect: Timer, A](env: Environment[F, A]): Stream[F, Unit] =
env.telemetryConfig.disable match {
case true =>
Stream.empty.covary[F]
case _ =>
val sdj = makeHeartbeatEvent(
env.telemetryConfig,
env.region,
env.cloud,
env.processor.artifact,
env.processor.version
)
val tracker = initTracker(env.telemetryConfig, env.processor.artifact, env.httpClient)
Stream
.fixedDelay[F](env.telemetryConfig.interval)
.evalMap { _ =>
tracker.use { t =>
t.trackSelfDescribingEvent(unstructEvent = sdj) >>
t.flushEmitters()
}
}
}

private def initTracker[F[_]: ConcurrentEffect: Timer](config: TelemetryConfig, appName: String, client: HttpClient[F]): Resource[F, Tracker[F]] =
for {
emitter <- Http4sEmitter.build(
EndpointParams(config.collectorUri, port = Some(config.collectorPort), https = config.secure),
client,
retryPolicy = RetryPolicy.MaxAttempts(10),
callback = Some(emitterCallback[F] _)
)
} yield new Tracker(NonEmptyList.of(emitter), "tracker-telemetry", appName)

private def emitterCallback[F[_]: Sync](params: EndpointParams, req: Request, res: TrackerResult): F[Unit] =
res match {
case TrackerResult.Success(_) =>
Logger[F].debug(s"Telemetry heartbeat successfully sent to ${params.getGetUri}")
case TrackerResult.Failure(code) =>
Logger[F].warn(s"Sending telemetry hearbeat got unexpected HTTP code $code from ${params.getUri}")
case TrackerResult.TrackerFailure(exception) =>
Logger[F].warn(s"Telemetry hearbeat failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempts")
case TrackerResult.RetriesExceeded(failure) =>
Logger[F].error(s"Stopped trying to send telemetry heartbeat after following failure: $failure")
}

private def makeHeartbeatEvent(
teleCfg: TelemetryConfig,
region: Option[String],
cloud: Option[Cloud],
appName: String,
appVersion: String
): SelfDescribingData[Json] = SelfDescribingData(
SchemaKey("com.snowplowanalytics.oss", "oss_context", "jsonschema", SchemaVer.Full(1, 0, 1)),
Json.obj(
"userProvidedId" -> teleCfg.userProvidedId.asJson,
"autoGeneratedId" -> teleCfg.autoGeneratedId.asJson,
"moduleName" -> teleCfg.moduleName.asJson,
"moduleVersion" -> teleCfg.moduleVersion.asJson,
"instanceId" -> teleCfg.instanceId.asJson,
"appGeneratedId" -> java.util.UUID.randomUUID.toString.asJson,
"cloud" -> cloud.asJson,
"region" -> region.asJson,
"applicationName" -> appName.asJson,
"applicationVersion" -> appVersion.asJson
)
)

sealed trait Cloud

object Cloud {
case object Aws extends Cloud
case object Gcp extends Cloud

implicit val encoder: Encoder[Cloud] = Encoder.encodeString.contramap[Cloud](_.toString().toUpperCase())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import pureconfig.ConfigSource
import pureconfig.module.catseffect.syntax._
import pureconfig.module.circe._

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs, Telemetry}

/**
* Parsed HOCON configuration file
Expand All @@ -36,13 +36,15 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency,
* @param output wraps good bad and pii outputs (PubSub, Kinesis, FS etc)
* @param assetsUpdatePeriod time after which assets should be updated, in minutes
* @param monitoring configuration for sentry and metrics
* @param telemetry configuration for telemetry
*/
final case class ConfigFile(
input: Input,
output: Outputs,
concurrency: Concurrency,
assetsUpdatePeriod: Option[FiniteDuration],
monitoring: Option[Monitoring]
monitoring: Option[Monitoring],
telemetry: Telemetry
)

object ConfigFile {
Expand All @@ -53,7 +55,7 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, Some(aup), _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _, _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, r, _, _, _, _, _)), bad), _, _, _, _)
Expand Down
Loading

0 comments on commit a6f44ab

Please sign in to comment.