diff --git a/config/config.kinesis.minimal.hocon b/config/config.kinesis.minimal.hocon index 7d1af227a..d6ca423e2 100644 --- a/config/config.kinesis.minimal.hocon +++ b/config/config.kinesis.minimal.hocon @@ -3,11 +3,13 @@ "streamName": "collector-payloads" } - "good": { - "streamName": "enriched" - } + "output": { + "good": { + "streamName": "enriched" + } - "bad": { - "streamName": "bad" + "bad": { + "streamName": "bad" + } } } diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 15cc6e7d0..6f2cf17bd 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -59,239 +59,241 @@ # cloudwatchCustomEndpoint = "http://localhost:4582" } - # Enriched events output - "good": { - "type": "Kinesis" - - # Name of the Kinesis stream to write to - "streamName": "enriched" - - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + "output": { + # Enriched events output + "good": { + "type": "Kinesis" + + # Name of the Kinesis stream to write to + "streamName": "enriched" + + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" + + # Optional. How the output stream/topic will be partitioned in Kinesis + # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, + # user_ipaddress, domain_sessionid, user_fingerprint + # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the + # possible parittion keys correspond to. + # Otherwise, the partition key will be a random UUID. + # "partitionKey" = "user_id" + + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. How the output stream/topic will be partitioned in Kinesis - # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, - # user_ipaddress, domain_sessionid, user_fingerprint - # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the - # possible parittion keys correspond to. - # Otherwise, the partition key will be a random UUID. - # "partitionKey" = "user_id" - - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} + + # Optional. Maximum number of connections to open in the backend. + # HTTP requests are sent in parallel over multiple connections. + # Setting this too high may impact latency and consume additional resources + # without increasing throughput + "maxConnections": 24 + + # Optional. Minimum level of logs for the native KPL daemon. + # Logs show up on stderr + # Possible values: trace | debug | info | warning | error + "logLevel": "warning" + + # Optional. Use a custom Kinesis endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS. + # Needs to be specified along with customPort + # "customEndpoint": "localhost" + + # Optional. Server port to connect to for Kinesis. + # Needs to be specified along with customEndpoint + # "customPort": 4566 + + # Optional. Use a custom Cloudwatch endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS + # Needs to be specified along with cloudwatchPort + # "cloudwatchEndpoint": "localhost" + + # Optional. Server port to connect to for CloudWatch. + # Needs to be specified along with cloudwatchPort + # "cloudwatchPort": 4582 } - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} - - # Optional. Maximum number of connections to open in the backend. - # HTTP requests are sent in parallel over multiple connections. - # Setting this too high may impact latency and consume additional resources - # without increasing throughput - "maxConnections": 24 - - # Optional. Minimum level of logs for the native KPL daemon. - # Logs show up on stderr - # Possible values: trace | debug | info | warning | error - "logLevel": "warning" - - # Optional. Use a custom Kinesis endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS. - # Needs to be specified along with customPort - # "customEndpoint": "localhost" - - # Optional. Server port to connect to for Kinesis. - # Needs to be specified along with customEndpoint - # "customPort": 4566 - - # Optional. Use a custom Cloudwatch endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS - # Needs to be specified along with cloudwatchPort - # "cloudwatchEndpoint": "localhost" - - # Optional. Server port to connect to for CloudWatch. - # Needs to be specified along with cloudwatchPort - # "cloudwatchPort": 4582 - } - - # Pii events output - "pii": { - "type": "Kinesis" - - # Name of the Kinesis stream to write to - "streamName": "pii" - - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + # Pii events output + "pii": { + "type": "Kinesis" + + # Name of the Kinesis stream to write to + "streamName": "pii" + + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" + + # Optional. How the output stream/topic will be partitioned in Kinesis + # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, + # user_ipaddress, domain_sessionid, user_fingerprint + # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the + # possible parittion keys correspond to. + # Otherwise, the partition key will be a random UUID. + # "partitionKey" = "user_id" + + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. How the output stream/topic will be partitioned in Kinesis - # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, - # user_ipaddress, domain_sessionid, user_fingerprint - # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the - # possible parittion keys correspond to. - # Otherwise, the partition key will be a random UUID. - # "partitionKey" = "user_id" - - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} + + # Optional. Maximum number of connections to open in the backend. + # HTTP requests are sent in parallel over multiple connections. + # Setting this too high may impact latency and consume additional resources + # without increasing throughput + "maxConnections": 24 + + # Optional. Minimum level of logs for the native KPL daemon. + # Logs show up on stderr + # Possible values: trace | debug | info | warning | error + "logLevel": "warning" + + # Optional. Use a custom Kinesis endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS. + # Needs to be specified along with customPort + # "customEndpoint": "localhost" + + # Optional. Server port to connect to for Kinesis. + # Needs to be specified along with customEndpoint + # "customPort": 4566 + + # Optional. Use a custom Cloudwatch endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS + # Needs to be specified along with cloudwatchPort + # "cloudwatchEndpoint": "localhost" + + # Optional. Server port to connect to for CloudWatch. + # Needs to be specified along with cloudwatchPort + # "cloudwatchPort": 4582 } - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} - - # Optional. Maximum number of connections to open in the backend. - # HTTP requests are sent in parallel over multiple connections. - # Setting this too high may impact latency and consume additional resources - # without increasing throughput - "maxConnections": 24 - - # Optional. Minimum level of logs for the native KPL daemon. - # Logs show up on stderr - # Possible values: trace | debug | info | warning | error - "logLevel": "warning" - - # Optional. Use a custom Kinesis endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS. - # Needs to be specified along with customPort - # "customEndpoint": "localhost" - - # Optional. Server port to connect to for Kinesis. - # Needs to be specified along with customEndpoint - # "customPort": 4566 - - # Optional. Use a custom Cloudwatch endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS - # Needs to be specified along with cloudwatchPort - # "cloudwatchEndpoint": "localhost" - - # Optional. Server port to connect to for CloudWatch. - # Needs to be specified along with cloudwatchPort - # "cloudwatchPort": 4582 - } + # Bad rows output + "bad": { + "type": "Kinesis" - # Bad rows output - "bad": { - "type": "Kinesis" + # Name of the Kinesis stream to write to + "streamName": "bad" - # Name of the Kinesis stream to write to - "streamName": "bad" + # Optional. Region where the Kinesis stream is located + # This field is optional if it can be resolved with AWS region provider chain. + # It checks places like env variables, system properties, AWS profile file. + # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html + "region": "eu-central-1" - # Optional. Region where the Kinesis stream is located - # This field is optional if it can be resolved with AWS region provider chain. - # It checks places like env variables, system properties, AWS profile file. - # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html - "region": "eu-central-1" + # Optional. Minimum and maximum backoff periods + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } - # Optional. Minimum and maximum backoff periods - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } + # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent + "maxBufferedTime": 100 millis - # Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent - "maxBufferedTime": 100 millis + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + "collection": { + # Maximum number of Kinesis records to pack into a PutRecords request + "maxCount": 500 - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - "collection": { - # Maximum number of Kinesis records to pack into a PutRecords request - "maxCount": 500 + # Maximum amount of data to send with a PutRecords request + "maxSize": 5242880 + } - # Maximum amount of data to send with a PutRecords request - "maxSize": 5242880 + # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html + # If not specified, aggregation is deactivated + #"aggregation": { + # # Maximum number of enriched events to pack into an aggregated Kinesis record + # "maxCount": 4294967295 + # + # # Maximum number of bytes to pack into an aggregated Kinesis record + # "maxSize": 51200 + #} + + # Optional. Maximum number of connections to open in the backend. + # HTTP requests are sent in parallel over multiple connections. + # Setting this too high may impact latency and consume additional resources + # without increasing throughput + "maxConnections": 24 + + # Optional. Minimum level of logs for the native KPL daemon. + # Logs show up on stderr + # Possible values: trace | debug | info | warning | error + "logLevel": "warning" + + # Optional. Use a custom Kinesis endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS. + # Needs to be specified along with customPort + # "customEndpoint": "localhost" + + # Optional. Server port to connect to for Kinesis. + # Needs to be specified along with customEndpoint + # "customPort": 4566 + + # Optional. Use a custom Cloudwatch endpoint. + # Note this does not accept protocols or paths, only host names or ip addresses. + # There is no way to disable TLS + # Needs to be specified along with cloudwatchPort + # "cloudwatchEndpoint": "localhost" + + # Optional. Server port to connect to for CloudWatch. + # Needs to be specified along with cloudwatchPort + # "cloudwatchPort": 4582 } - - # Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html - # If not specified, aggregation is deactivated - #"aggregation": { - # # Maximum number of enriched events to pack into an aggregated Kinesis record - # "maxCount": 4294967295 - # - # # Maximum number of bytes to pack into an aggregated Kinesis record - # "maxSize": 51200 - #} - - # Optional. Maximum number of connections to open in the backend. - # HTTP requests are sent in parallel over multiple connections. - # Setting this too high may impact latency and consume additional resources - # without increasing throughput - "maxConnections": 24 - - # Optional. Minimum level of logs for the native KPL daemon. - # Logs show up on stderr - # Possible values: trace | debug | info | warning | error - "logLevel": "warning" - - # Optional. Use a custom Kinesis endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS. - # Needs to be specified along with customPort - # "customEndpoint": "localhost" - - # Optional. Server port to connect to for Kinesis. - # Needs to be specified along with customEndpoint - # "customPort": 4566 - - # Optional. Use a custom Cloudwatch endpoint. - # Note this does not accept protocols or paths, only host names or ip addresses. - # There is no way to disable TLS - # Needs to be specified along with cloudwatchPort - # "cloudwatchEndpoint": "localhost" - - # Optional. Server port to connect to for CloudWatch. - # Needs to be specified along with cloudwatchPort - # "cloudwatchPort": 4582 } # Optional. Concurrency of the app diff --git a/config/config.pubsub.hocon.sample b/config/config.pubsub.hocon.sample index 3625a654a..541c745db 100644 --- a/config/config.pubsub.hocon.sample +++ b/config/config.pubsub.hocon.sample @@ -9,43 +9,45 @@ # "dir": "/var/collector" } - # Enriched events output - "good": { - "type": "PubSub" - "topic": "projects/test-project/topics/good-topic" - - # Enriched event fields to add as PubSub message attributes. - "attributes": [ "app_id" ] - - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/enriched" - # "maxBytes": 1000000 - } + "output": { + # Enriched events output + "good": { + "type": "PubSub" + "topic": "projects/test-project/topics/good-topic" + + # Enriched event fields to add as PubSub message attributes. + "attributes": [ "app_id" ] + + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/enriched" + # "maxBytes": 1000000 + } - # Pii events output - "pii": { - "type": "PubSub" - "topic": "projects/test-project/topics/pii-topic" + # Pii events output + "pii": { + "type": "PubSub" + "topic": "projects/test-project/topics/pii-topic" - # Enriched event fields to add as PubSub message attributes. - # "attributes": [ "app_id" ] + # Enriched event fields to add as PubSub message attributes. + # "attributes": [ "app_id" ] - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/pii" - # "maxBytes": 1000000 - } + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/pii" + # "maxBytes": 1000000 + } - # Bad rows output - "bad": { - "type": "PubSub" - "topic": "projects/test-project/topics/bad-topic" + # Bad rows output + "bad": { + "type": "PubSub" + "topic": "projects/test-project/topics/bad-topic" - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/bad" - # "maxBytes": 1000000 + # Local FS supported for testing purposes + # "type": "FileSystem" + # "file": "/var/bad" + # "maxBytes": 1000000 + } } # Optional. Concurrency of the app diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala index 69a8c0fc2..ce99434f4 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala @@ -69,13 +69,13 @@ object Run { _ <- Logger[F].info(s"Initialising resources for $name $version") processor = Processor(name, version) file = parsed.configFile - sinkGood = initAttributedSink(blocker, file.good, file.monitoring, mkSinkGood) - sinkPii = file.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii)) - sinkBad = file.bad match { + sinkGood = initAttributedSink(blocker, file.output.good, file.monitoring, mkSinkGood) + sinkPii = file.output.pii.map(out => initAttributedSink(blocker, out, file.monitoring, mkSinkPii)) + sinkBad = file.output.bad match { case f: Output.FileSystem => Sink.fileSink[F](f, blocker) case _ => - mkSinkBad(blocker, file.bad, file.monitoring) + mkSinkBad(blocker, file.output.bad, file.monitoring) } clients = mkClients.map(mk => mk(blocker)) exit <- file.input match { diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index cfd8234da..8587c0900 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -27,23 +27,19 @@ import pureconfig.ConfigSource import pureconfig.module.catseffect.syntax._ import pureconfig.module.circe._ -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Concurrency, Input, Monitoring, Output, Outputs} /** * Parsed HOCON configuration file * * @param input input (PubSub, Kinesis etc) - * @param good good enriched output (PubSub, Kinesis, FS etc) - * @param pii pii enriched output (PubSub, Kinesis, FS etc) - * @param bad bad rows output (PubSub, Kinesis, FS etc) + * @param output wraps good bad and pii outputs (PubSub, Kinesis, FS etc) * @param assetsUpdatePeriod time after which assets should be updated, in minutes * @param monitoring configuration for sentry and metrics */ final case class ConfigFile( input: Input, - good: Output, - pii: Option[Output], - bad: Output, + output: Outputs, concurrency: Concurrency, assetsUpdatePeriod: Option[FiniteDuration], monitoring: Option[Monitoring] @@ -57,8 +53,11 @@ object ConfigFile { implicit val configFileDecoder: Decoder[ConfigFile] = deriveConfiguredDecoder[ConfigFile].emap { - case ConfigFile(_, _, _, _, _, Some(aup), _) if aup._1 <= 0L => + case ConfigFile(_, _, _, Some(aup), _) if aup._1 <= 0L => "assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype + // Remove pii output if streamName and region empty + case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _) + if(s.isEmpty && r.isEmpty) => c.copy(output = Outputs(good, None, bad)).asRight case other => other.asRight } implicit val configFileEncoder: Encoder[ConfigFile] = diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index 8355baa39..7faf7128a 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -74,8 +74,11 @@ object ParsedConfigs { } configFile <- ConfigFile.parse[F](config.config) configFile <- validateConfig[F](configFile) - goodAttributes = outputAttributes(configFile.good) - piiAttributes = configFile.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } + _ <- EitherT.liftF( + Logger[F].info(s"Parsed config file: ${configFile}") + ) + goodAttributes = outputAttributes(configFile.output.good) + piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x") _ <- EitherT.liftF( Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name).mkString(", ")}") @@ -87,8 +90,8 @@ object ParsedConfigs { } yield ParsedConfigs(igluJson, configs, configFile, goodAttributes, piiAttributes) private[config] def validateConfig[F[_]: Applicative](configFile: ConfigFile): EitherT[F, String, ConfigFile] = { - val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.good) - val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.pii.map(validateAttributes).sequence + val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.output.good) + val optPiiCheck: ValidationResult[Option[OutputConfig]] = configFile.output.pii.map(validateAttributes).sequence (goodCheck, optPiiCheck) .mapN { case (_, _) => configFile } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index c052dc093..6e5b940bc 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -163,6 +163,12 @@ object io { deriveConfiguredEncoder[Input] } + case class Outputs(good: Output, pii: Option[Output], bad: Output) + object Outputs { + implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs] + implicit val outputsEncoder: Encoder[Outputs] = deriveConfiguredEncoder[Outputs] + } + sealed trait Output object Output { @@ -233,6 +239,8 @@ object io { case _ => s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft } + case Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _) if(s.isEmpty && r.nonEmpty) => + "streamName needs to be set".asLeft case other => other.asRight } .emap { diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala index 6395ba8e4..0230945ca 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala @@ -42,18 +42,20 @@ class CliConfigSpec extends Specification with CatsIO { type = "PubSub" subscription = "projects/test-project/subscriptions/inputSub" } - good = { - type = "PubSub" - topic = "projects/test-project/topics/good-topic" - } - pii = { - type = "PubSub" - topic = "projects/test-project/topics/pii-topic" - attributes = [ "app_id", "platform" ] - } - bad = { - type = "PubSub" - topic = "projects/test-project/topics/bad-topic" + output = { + good = { + type = "PubSub" + topic = "projects/test-project/topics/good-topic" + } + pii = { + type = "PubSub" + topic = "projects/test-project/topics/pii-topic" + attributes = [ "app_id", "platform" ] + } + bad = { + type = "PubSub" + topic = "projects/test-project/topics/bad-topic" + } } concurrency = { enrich = 256 @@ -64,9 +66,11 @@ class CliConfigSpec extends Specification with CatsIO { val expected = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(256, 3), None, None diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index c28082dc8..28bf5631e 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -33,9 +33,11 @@ class ConfigFileSpec extends Specification with CatsIO { val configPath = Paths.get(getClass.getResource("/config.pubsub.hocon.sample").toURI) val expected = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(256, 3), Some(7.days), Some( @@ -68,24 +70,41 @@ class ConfigFileSpec extends Specification with CatsIO { None, None ), - io.Output.Kinesis( - "enriched", - Some("eu-central-1"), - None, - io.Output.BackoffPolicy(100.millis, 10.seconds), - 100.millis, - io.Output.Collection(500, 5242880), - None, - 24, - "warning", - None, - None, - None, - None - ), - Some( + io.Outputs( io.Output.Kinesis( - "pii", + "enriched", + Some("eu-central-1"), + None, + io.Output.BackoffPolicy(100.millis, 10.seconds), + 100.millis, + io.Output.Collection(500, 5242880), + None, + 24, + "warning", + None, + None, + None, + None + ), + Some( + io.Output.Kinesis( + "pii", + Some("eu-central-1"), + None, + io.Output.BackoffPolicy(100.millis, 10.seconds), + 100.millis, + io.Output.Collection(500, 5242880), + None, + 24, + "warning", + None, + None, + None, + None + ) + ), + io.Output.Kinesis( + "bad", Some("eu-central-1"), None, io.Output.BackoffPolicy(100.millis, 10.seconds), @@ -100,21 +119,6 @@ class ConfigFileSpec extends Specification with CatsIO { None ) ), - io.Output.Kinesis( - "bad", - Some("eu-central-1"), - None, - io.Output.BackoffPolicy(100.millis, 10.seconds), - 100.millis, - io.Output.Collection(500, 5242880), - None, - 24, - "warning", - None, - None, - None, - None - ), io.Concurrency(256, 1), Some(7.days), Some( @@ -140,17 +144,19 @@ class ConfigFileSpec extends Specification with CatsIO { "type": "PubSub", "subscription": "projects/test-project/subscriptions/inputSub" }, - "good": { - "type": "PubSub", - "topic": "projects/test-project/topics/good-topic" - }, - "pii": { - "type": "PubSub", - "topic": "projects/test-project/topics/pii-topic" - }, - "bad": { - "type": "PubSub", - "topic": "projects/test-project/topics/bad-topic" + "output": { + "good": { + "type": "PubSub", + "topic": "projects/test-project/topics/good-topic" + }, + "pii": { + "type": "PubSub", + "topic": "projects/test-project/topics/pii-topic" + }, + "bad": { + "type": "PubSub", + "topic": "projects/test-project/topics/bad-topic" + } }, "concurrency": { "enrich": 256, diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index e88de074b..831b4ee2e 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -32,9 +32,11 @@ class ParsedConfigsSpec extends Specification with CatsIO { val configFile = ConfigFile( io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None), + io.Outputs( + io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None), + Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + ), io.Concurrency(10000, 64), Some(7.days), Some( diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index 18e272389..f23c1d8cb 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -12,49 +12,55 @@ "bufferSize": 3 } - "good": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 + "output": { + "good": { + "type": "Kinesis" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } + "maxConnections": 24 + "logLevel": "warning" } - "maxConnections": 24 - "logLevel": "warning" - } - "pii": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds + "pii": { + "type": "Kinesis" + # we need all the fields to exist to have defaults + "streamName": "" + # we need all the fields to exist to have defaults + "region": "" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } + "maxConnections": 24 + "logLevel": "warning" } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 - } - "maxConnections": 24 - "logLevel": "warning" - } - "bad": { - "type": "Kinesis" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - } - "maxBufferedTime": 100 milliseconds - "collection": { - "maxCount": 500 - "maxSize": 5242880 + "bad": { + "type": "Kinesis" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + } + "maxBufferedTime": 100 milliseconds + "collection": { + "maxCount": 500 + "maxSize": 5242880 + } + "maxConnections": 24 + "logLevel": "warning" } - "maxConnections": 24 - "logLevel": "warning" } "concurrency" : {