diff --git a/config/config.kinesis.minimal.hocon b/config/config.kinesis.minimal.hocon index 7d1af227a..d6ca423e2 100644 --- a/config/config.kinesis.minimal.hocon +++ b/config/config.kinesis.minimal.hocon @@ -3,11 +3,13 @@ "streamName": "collector-payloads" } - "good": { - "streamName": "enriched" - } + "output": { + "good": { + "streamName": "enriched" + } - "bad": { - "streamName": "bad" + "bad": { + "streamName": "bad" + } } } diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 1d91b7f7f..5e382837c 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -46,148 +46,150 @@ # dynamodbCustomEndpoint = "http://localhost:4569" } - # Enriched events output - "good": { - "type": "Kinesis" - - # Name of the Kinesis stream to write to - "streamName": "enriched" - - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + "output": { + # Enriched events output + "good": { + "type": "Kinesis" + + # Name of the Kinesis stream to write to + "streamName": "enriched" + + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" + + # Optional. How the output stream/topic will be partitioned in Kinesis + # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, + # user_ipaddress, domain_sessionid, user_fingerprint + # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the + # possible parittion keys correspond to. + # Otherwise, the partition key will be a random UUID. + # "partitionKey" = "user_id" + + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. How the output stream/topic will be partitioned in Kinesis - # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, - # user_ipaddress, domain_sessionid, user_fingerprint - # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the - # possible parittion keys correspond to. - # Otherwise, the partition key will be a random UUID. - # "partitionKey" = "user_id" - - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} } - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} - } - - # Pii events output - "pii": { - "type": "Kinesis" - - # Name of the Kinesis stream to write to - "streamName": "pii" - - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + # Pii events output + "pii": { + "type": "Kinesis" + + # Name of the Kinesis stream to write to + "streamName": "pii" + + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" + + # Optional. How the output stream/topic will be partitioned in Kinesis + # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, + # user_ipaddress, domain_sessionid, user_fingerprint + # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the + # possible parittion keys correspond to. + # Otherwise, the partition key will be a random UUID. + # "partitionKey" = "user_id" + + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. How the output stream/topic will be partitioned in Kinesis - # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, - # user_ipaddress, domain_sessionid, user_fingerprint - # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the - # possible parittion keys correspond to. - # Otherwise, the partition key will be a random UUID. - # "partitionKey" = "user_id" - - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} } - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} - } + # Bad rows output + "bad": { + "type": "Kinesis" - # Bad rows output - "bad": { - "type": "Kinesis" + # Name of the Kinesis stream to write to + "streamName": "bad" - # Name of the Kinesis stream to write to - "streamName": "bad" + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} } - - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} } - + # Optional. Concurrency of the app "concurrency" : { # Number of events that can get enriched at the same time within a chunk @@ -195,34 +197,34 @@ # Number of chunks that can get sunk at the same time "sink": 1 } - + # Optional, period after which enrich assets should be checked for updates # no assets will be updated if the key is absent "assetsUpdatePeriod": "7 days" - + "monitoring": { - + # Optional, for tracking runtime exceptions "sentry": { "dsn": "http://sentry.acme.com" } - + # Optional, configure how metrics are reported "metrics": { - # Optional. Send metrics to a StatsD server on localhost + # Optional. Send metrics to a StatsD server "statsd": { "hostname": "localhost" "port": 8125 - + # Required, how frequently to report metrics "period": "10 seconds" - + # Any key-value pairs to be tagged on every StatsD metric "tags": { "app": enrich } - + # Optional, override the default metric prefix # "prefix": "snowplow.enrich." } @@ -230,7 +232,7 @@ # Optional. Log to stdout using Slf4j "stdout": { "period": "10 seconds" - + # Optional, override the default metric prefix # "prefix": "snowplow.enrich." } diff --git a/config/config.pubsub.hocon.sample b/config/config.pubsub.hocon.sample index 3625a654a..541c745db 100644 --- a/config/config.pubsub.hocon.sample +++ b/config/config.pubsub.hocon.sample @@ -9,43 +9,45 @@ # "dir": "/var/collector" } - # Enriched events output - "good": { - "type": "PubSub" - "topic": "projects/test-project/topics/good-topic" - - # Enriched event fields to add as PubSub message attributes. - "attributes": [ "app_id" ] - - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/enriched" - # "maxBytes": 1000000 - } + "output": { + # Enriched events output + "good": { + "type": "PubSub" + "topic": "projects/test-project/topics/good-topic" + + # Enriched event fields to add as PubSub message attributes. + "attributes": [ "app_id" ] + + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/enriched" + # "maxBytes": 1000000 + } - # Pii events output - "pii": { - "type": "PubSub" - "topic": "projects/test-project/topics/pii-topic" + # Pii events output + "pii": { + "type": "PubSub" + "topic": "projects/test-project/topics/pii-topic" - # Enriched event fields to add as PubSub message attributes. - # "attributes": [ "app_id" ] + # Enriched event fields to add as PubSub message attributes. + # "attributes": [ "app_id" ] - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/pii" - # "maxBytes": 1000000 - } + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/pii" + # "maxBytes": 1000000 + } - # Bad rows output - "bad": { - "type": "PubSub" - "topic": "projects/test-project/topics/bad-topic" + # Bad rows output + "bad": { + "type": "PubSub" + "topic": "projects/test-project/topics/bad-topic" - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/bad" - # "maxBytes": 1000000 + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/bad" + # "maxBytes": 1000000 + } } # Optional. Concurrency of the app diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 69a8c0fc2..ce99434f4 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -69,13 +69,13 @@ object Run { _ <- Logger[F].info(s"Initialising resources for $name $version") processor = Processor(name, version) file = parsed.configFile - sinkGood = initAttributedSink(blocker, file.good, file.monitoring, mkSinkGood) - sinkPii = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii)) - sinkBad = file.bad match { + sinkGood = initAttributedSink(blocker, file.output.good, file.monitoring, mkSinkGood) + sinkPii = file.output.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii)) + sinkBad = file.output.bad match { case f: Output.FileSystem => Sink.fileSink[F](f, blocker) case _ => - mkSinkBad(blocker, file.bad, file.monitoring) + mkSinkBad(blocker, file.output.bad, file.monitoring) } clients = mkClients.map(mk => mk(blocker)) exit <- file.input match { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index cfd8234da..7e484d8a2 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -27,23 +27,19 @@ import pureconfig.ConfigSource import pureconfig.module.catseffect.syntax._ import pureconfig.module.circe._ -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs} /** * Parsed HOCON configuration file * * @param input input (PubSub, Kinesis etc) - * @param good good enriched output (PubSub, Kinesis, FS etc) - * @param pii pii enriched output (PubSub, Kinesis, FS etc) - * @param bad bad rows output (PubSub, Kinesis, FS etc) + * @param output wraps good bad and pii outputs (PubSub, Kinesis, FS etc) * @param assetsUpdatePeriod time after which assets should be updated, in minutes * @param monitoring configuration for sentry and metrics */ final case class ConfigFile( input: Input, - good: Output, - pii: Option[Output], - bad: Output, + output: Outputs, concurrency: Concurrency, assetsUpdatePeriod: Option[FiniteDuration], monitoring: Option[Monitoring] @@ -57,8 +53,11 @@ object ConfigFile { implicit val configFileDecoder: Decoder[ConfigFile] = deriveConfiguredDecoder[ConfigFile].emap { - case ConfigFile(_, _, _, _, _, Some(aup), _) if aup._1 <= 0L => + case ConfigFile(_, _, _, Some(aup), _) if aup._1 <= 0L => "assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype + // Remove pii output if streamName and region empty + case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, r, _, _, _, _, _)), bad), _, _, _, _) + if(s.isEmpty && r.isEmpty) => c.copy(output = Outputs(good, None, bad)).asRight case other => other.asRight } implicit val configFileEncoder: Encoder[ConfigFile] = diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index 45f3c60be..2ec84fe94 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -74,8 +74,11 @@ object ParsedConfigs { } configFile <- ConfigFile.parse[F](config.config) configFile <- validateConfig[F](configFile) - goodAttributes = outputAttributes(configFile.good) - piiAttributes = configFile.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } + _ <- EitherT.liftF( + Logger[F].info(s"Parsed config file: ${configFile}") + ) + goodAttributes = outputAttributes(configFile.output.good) + piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x") _ <- EitherT.liftF( Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name).mkString(", ")}") @@ -87,8 +90,8 @@ object ParsedConfigs { } yield ParsedConfigs(igluJson, configs, configFile, goodAttributes, piiAttributes) private[config] def validateConfig[F[_]: Applicative](configFile: ConfigFile): EitherT[F, String, ConfigFile] = { - val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.good) - val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.pii.map(validateAttributes).sequence + val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.output.good) + val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.output.pii.map(validateAttributes).sequence (goodCheck, optPiiCheck) .mapN { case (_, _) => configFile } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 15eefe46f..2abb8a94c 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -161,6 +161,12 @@ object io { deriveConfiguredEncoder[Input] } + case class Outputs(good: Output, pii: Option[Output], bad: Output) + object Outputs { + implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs] + implicit val outputsEncoder: Encoder[Outputs] = deriveConfiguredEncoder[Outputs] + } + sealed trait Output object Output { @@ -225,6 +231,8 @@ object io { case _ => s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft } + case Kinesis(s, r, _, _, _, _, _) if(s.isEmpty && r.nonEmpty) => + "streamName needs to be set".asLeft case other => other.asRight } .emap { diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala index 6395ba8e4..0230945ca 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala @@ -42,18 +42,20 @@ class CliConfigSpec extends Specification with CatsIO { type = "PubSub" subscription = "projects/test-project/subscriptions/inputSub" } - good = { - type = "PubSub" - topic = "projects/test-project/topics/good-topic" - } - pii = { - type = "PubSub" - topic = "projects/test-project/topics/pii-topic" - attributes = [ "app_id", "platform" ] - } - bad = { - type = "PubSub" - topic = "projects/test-project/topics/bad-topic" + output = { + good = { + type = "PubSub" + topic = "projects/test-project/topics/good-topic" + } + pii = { + type = "PubSub" + topic = "projects/test-project/topics/pii-topic" + attributes = [ "app_id", "platform" ] + } + bad = { + type = "PubSub" + topic = "projects/test-project/topics/bad-topic" + } } concurrency = { enrich = 256 @@ -64,9 +66,11 @@ class CliConfigSpec extends Specification with CatsIO { val expected = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(256, 3), None, None diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 2bb6da910..8d50ace3c 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -33,9 +33,11 @@ class ConfigFileSpec extends Specification with CatsIO { val configPath = Paths.get(getClass.getResource("/config.pubsub.hocon.sample").toURI) val expected = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(256, 3), Some(7.days), Some( @@ -66,18 +68,29 @@ class ConfigFileSpec extends Specification with CatsIO { None, None ), - io.Output.Kinesis( - "enriched", - Some("eu-central-1"), - None, - io.Output.BackoffPolicy(100.millis, 10.seconds), - 100.millis, - io.Output.Collection(500, 5242880), - None - ), - Some( + io.Outputs( io.Output.Kinesis( - "pii", + "enriched", + Some("eu-central-1"), + None, + io.Output.BackoffPolicy(100.millis, 10.seconds), + 100.millis, + io.Output.Collection(500, 5242880), + None + ), + Some( + io.Output.Kinesis( + "pii", + Some("eu-central-1"), + None, + io.Output.BackoffPolicy(100.millis, 10.seconds), + 100.millis, + io.Output.Collection(500, 5242880), + None + ) + ), + io.Output.Kinesis( + "bad", Some("eu-central-1"), None, io.Output.BackoffPolicy(100.millis, 10.seconds), @@ -86,15 +99,6 @@ class ConfigFileSpec extends Specification with CatsIO { None ) ), - io.Output.Kinesis( - "bad", - Some("eu-central-1"), - None, - io.Output.BackoffPolicy(100.millis, 10.seconds), - 100.millis, - io.Output.Collection(500, 5242880), - None - ), io.Concurrency(256, 1), Some(7.days), Some( @@ -120,17 +124,19 @@ class ConfigFileSpec extends Specification with CatsIO { "type": "PubSub", "subscription": "projects/test-project/subscriptions/inputSub" }, - "good": { - "type": "PubSub", - "topic": "projects/test-project/topics/good-topic" - }, - "pii": { - "type": "PubSub", - "topic": "projects/test-project/topics/pii-topic" - }, - "bad": { - "type": "PubSub", - "topic": "projects/test-project/topics/bad-topic" + "output": { + "good": { + "type": "PubSub", + "topic": "projects/test-project/topics/good-topic" + }, + "pii": { + "type": "PubSub", + "topic": "projects/test-project/topics/pii-topic" + }, + "bad": { + "type": "PubSub", + "topic": "projects/test-project/topics/bad-topic" + } }, "concurrency": { "enrich": 256, diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index e88de074b..831b4ee2e 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -32,9 +32,11 @@ class ParsedConfigsSpec extends Specification with CatsIO { val configFile = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(10000, 64), Some(7.days), Some( diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index bf74591e2..2ac7c187f 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -11,42 +11,48 @@ } } - "good": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 + "output": { + "good": { + "type": "Kinesis" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } } - } - "pii": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds + "pii": { + "type": "Kinesis" + # we need all the fields to exist to have defaults + "streamName": "" + # we need all the fields to exist to have defaults + "region": "" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 - } - } - "bad": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 + "bad": { + "type": "Kinesis" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } } }