Skip to content

Commit

Permalink
common-fs2: put good bad and pii inside output {} in config (close #493)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 26, 2022
1 parent bee91c3 commit 743da43
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 368 deletions.
12 changes: 7 additions & 5 deletions config/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
"streamName": "collector-payloads"
}

"good": {
"streamName": "enriched"
}
"output": {
"good": {
"streamName": "enriched"
}

"bad": {
"streamName": "bad"
"bad": {
"streamName": "bad"
}
}
}
428 changes: 215 additions & 213 deletions config/config.kinesis.reference.hocon

Large diffs are not rendered by default.

66 changes: 34 additions & 32 deletions config/config.pubsub.hocon.sample
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,45 @@
# "dir": "/var/collector"
}

# Enriched events output
"good": {
"type": "PubSub"
"topic": "projects/test-project/topics/good-topic"

# Enriched event fields to add as PubSub message attributes.
"attributes": [ "app_id" ]

# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/enriched"
# "maxBytes": 1000000
}
"output": {
# Enriched events output
"good": {
"type": "PubSub"
"topic": "projects/test-project/topics/good-topic"

# Enriched event fields to add as PubSub message attributes.
"attributes": [ "app_id" ]

# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/enriched"
# "maxBytes": 1000000
}

# Pii events output
"pii": {
"type": "PubSub"
"topic": "projects/test-project/topics/pii-topic"
# Pii events output
"pii": {
"type": "PubSub"
"topic": "projects/test-project/topics/pii-topic"

# Enriched event fields to add as PubSub message attributes.
# "attributes": [ "app_id" ]
# Enriched event fields to add as PubSub message attributes.
# "attributes": [ "app_id" ]

# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/pii"
# "maxBytes": 1000000
}
# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/pii"
# "maxBytes": 1000000
}

# Bad rows output
"bad": {
"type": "PubSub"
"topic": "projects/test-project/topics/bad-topic"
# Bad rows output
"bad": {
"type": "PubSub"
"topic": "projects/test-project/topics/bad-topic"

# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/bad"
# "maxBytes": 1000000
# Local FS supported for testing purposes
# "type": "FileSystem"
# "file": "/var/bad"
# "maxBytes": 1000000
}
}

# Optional. Concurrency of the app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ object Run {
_ <- Logger[F].info(s"Initialising resources for $name $version")
processor = Processor(name, version)
file = parsed.configFile
sinkGood = initAttributedSink(blocker, file.good, file.monitoring, mkSinkGood)
sinkPii = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii))
sinkBad = file.bad match {
sinkGood = initAttributedSink(blocker, file.output.good, file.monitoring, mkSinkGood)
sinkPii = file.output.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii))
sinkBad = file.output.bad match {
case f: Output.FileSystem =>
Sink.fileSink[F](f, blocker)
case _ =>
mkSinkBad(blocker, file.bad, file.monitoring)
mkSinkBad(blocker, file.output.bad, file.monitoring)
}
clients = mkClients.map(mk => mk(blocker))
exit <- file.input match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,19 @@ import pureconfig.ConfigSource
import pureconfig.module.catseffect.syntax._
import pureconfig.module.circe._

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs}

/**
* Parsed HOCON configuration file
*
* @param input input (PubSub, Kinesis etc)
* @param good good enriched output (PubSub, Kinesis, FS etc)
* @param pii pii enriched output (PubSub, Kinesis, FS etc)
* @param bad bad rows output (PubSub, Kinesis, FS etc)
* @param output wraps good bad and pii outputs (PubSub, Kinesis, FS etc)
* @param assetsUpdatePeriod time after which assets should be updated, in minutes
* @param monitoring configuration for sentry and metrics
*/
final case class ConfigFile(
input: Input,
good: Output,
pii: Option[Output],
bad: Output,
output: Outputs,
concurrency: Concurrency,
assetsUpdatePeriod: Option[FiniteDuration],
monitoring: Option[Monitoring]
Expand All @@ -57,8 +53,11 @@ object ConfigFile {

implicit val configFileDecoder: Decoder[ConfigFile] =
deriveConfiguredDecoder[ConfigFile].emap {
case ConfigFile(_, _, _, _, _, Some(aup), _) if aup._1 <= 0L =>
case ConfigFile(_, _, _, Some(aup), _) if aup._1 <= 0L =>
"assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype
// Remove pii output if streamName and region empty
case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _)
if(s.isEmpty && r.isEmpty) => c.copy(output = Outputs(good, None, bad)).asRight
case other => other.asRight
}
implicit val configFileEncoder: Encoder[ConfigFile] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ object ParsedConfigs {
}
configFile <- ConfigFile.parse[F](config.config)
configFile <- validateConfig[F](configFile)
goodAttributes = outputAttributes(configFile.good)
piiAttributes = configFile.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] }
_ <- EitherT.liftF(
Logger[F].info(s"Parsed config file: ${configFile}")
)
goodAttributes = outputAttributes(configFile.output.good)
piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] }
client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x")
_ <- EitherT.liftF(
Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name).mkString(", ")}")
Expand All @@ -87,8 +90,8 @@ object ParsedConfigs {
} yield ParsedConfigs(igluJson, configs, configFile, goodAttributes, piiAttributes)

private[config] def validateConfig[F[_]: Applicative](configFile: ConfigFile): EitherT[F, String, ConfigFile] = {
val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.good)
val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.pii.map(validateAttributes).sequence
val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.output.good)
val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.output.pii.map(validateAttributes).sequence

(goodCheck, optPiiCheck)
.mapN { case (_, _) => configFile }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ object io {
deriveConfiguredEncoder[Input]
}

case class Outputs(good: Output, pii: Option[Output], bad: Output)
object Outputs {
implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs]
implicit val outputsEncoder: Encoder[Outputs] = deriveConfiguredEncoder[Outputs]
}

sealed trait Output

object Output {
Expand Down Expand Up @@ -234,6 +240,8 @@ object io {
case _ =>
s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft
}
case Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _) if(s.isEmpty && r.nonEmpty) =>
"streamName needs to be set".asLeft
case other => other.asRight
}
.emap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ class CliConfigSpec extends Specification with CatsIO {
type = "PubSub"
subscription = "projects/test-project/subscriptions/inputSub"
}
good = {
type = "PubSub"
topic = "projects/test-project/topics/good-topic"
}
pii = {
type = "PubSub"
topic = "projects/test-project/topics/pii-topic"
attributes = [ "app_id", "platform" ]
}
bad = {
type = "PubSub"
topic = "projects/test-project/topics/bad-topic"
output = {
good = {
type = "PubSub"
topic = "projects/test-project/topics/good-topic"
}
pii = {
type = "PubSub"
topic = "projects/test-project/topics/pii-topic"
attributes = [ "app_id", "platform" ]
}
bad = {
type = "PubSub"
topic = "projects/test-project/topics/bad-topic"
}
}
concurrency = {
enrich = 256
Expand All @@ -64,9 +66,11 @@ class CliConfigSpec extends Specification with CatsIO {

val expected = ConfigFile(
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
io.Outputs(
io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None)
),
io.Concurrency(256, 3),
None,
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ class ConfigFileSpec extends Specification with CatsIO {
val configPath = Paths.get(getClass.getResource("/config.pubsub.hocon.sample").toURI)
val expected = ConfigFile(
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
io.Outputs(
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None)
),
io.Concurrency(256, 3),
Some(7.days),
Some(
Expand Down Expand Up @@ -69,24 +71,41 @@ class ConfigFileSpec extends Specification with CatsIO {
None,
None
),
io.Output.Kinesis(
"enriched",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
None
),
Some(
io.Outputs(
io.Output.Kinesis(
"pii",
"enriched",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
None
),
Some(
io.Output.Kinesis(
"pii",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
None
)
),
io.Output.Kinesis(
"bad",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
Expand All @@ -101,21 +120,6 @@ class ConfigFileSpec extends Specification with CatsIO {
None
)
),
io.Output.Kinesis(
"bad",
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds),
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
None
),
io.Concurrency(256, 1),
Some(7.days),
Some(
Expand All @@ -141,17 +145,19 @@ class ConfigFileSpec extends Specification with CatsIO {
"type": "PubSub",
"subscription": "projects/test-project/subscriptions/inputSub"
},
"good": {
"type": "PubSub",
"topic": "projects/test-project/topics/good-topic"
},
"pii": {
"type": "PubSub",
"topic": "projects/test-project/topics/pii-topic"
},
"bad": {
"type": "PubSub",
"topic": "projects/test-project/topics/bad-topic"
"output": {
"good": {
"type": "PubSub",
"topic": "projects/test-project/topics/good-topic"
},
"pii": {
"type": "PubSub",
"topic": "projects/test-project/topics/pii-topic"
},
"bad": {
"type": "PubSub",
"topic": "projects/test-project/topics/bad-topic"
}
},
"concurrency": {
"enrich": 256,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ class ParsedConfigsSpec extends Specification with CatsIO {

val configFile = ConfigFile(
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
io.Outputs(
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None)
),
io.Concurrency(10000, 64),
Some(7.days),
Some(
Expand Down
Loading

0 comments on commit 743da43

Please sign in to comment.