diff --git a/config/config.pubsub.hocon.sample b/config/config.file.extended.hocon similarity index 68% rename from config/config.pubsub.hocon.sample rename to config/config.file.extended.hocon index 1bb5a6a25..227029a11 100644 --- a/config/config.pubsub.hocon.sample +++ b/config/config.file.extended.hocon @@ -1,52 +1,35 @@ +# For testing purposes only +# Can be used with either enrich-pubsub or enrich-kinesis + { - # Collector input "input": { - "type": "PubSub" - "subscription": "projects/test-project/subscriptions/inputSub" + "type": "FileSystem" - # Local FS supported for testing purposes - # "type": "FileSystem" - # "dir": "/var/collector" + # Directory containing collector payloads that are Thrift encoded + "dir": "/var/collector-payloads" } "output": { # Enriched events output "good": { - "type": "PubSub" - "topic": "projects/test-project/topics/good-topic" - - # Enriched event fields to add as PubSub message attributes. - "attributes": [ "app_id" ] - - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/enriched" - # "maxBytes": 1000000 + "type": "FileSystem" + "file": "/var/enriched" + "maxBytes": 1000000 } # Pii events output "pii": { - "type": "PubSub" - "topic": "projects/test-project/topics/pii-topic" - - # Enriched event fields to add as PubSub message attributes. - # "attributes": [ "app_id" ] - - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/pii" - # "maxBytes": 1000000 + "type": "FileSystem" + "file": "/var/pii" + "maxBytes": 1000000 } # Bad rows output "bad": { - "type": "PubSub" - "topic": "projects/test-project/topics/bad-topic" - # Local FS supported for testing purposes - # "type": "FileSystem" - # "file": "/var/bad" - # "maxBytes": 1000000 + "type": "FileSystem" + "file": "/var/bad" + "maxBytes": 1000000 } } @@ -58,7 +41,7 @@ "sink": 3 } - # Optional, period after which enrich assets should be checked for updates + # Optional. period after which enrich assets should be checked for updates # no assets will be updated if the key is absent "assetsUpdatePeriod": "7 days" @@ -72,9 +55,8 @@ # Optional, configure how metrics are reported "metrics": { - # Send metrics to a StatsD server on localhost + # Optional. Send metrics to a StatsD server "statsd": { - "hostname": "localhost" "port": 8125 @@ -90,7 +72,7 @@ # "prefix": "snowplow.enrich." } - # Log to stdout using Slf4j + # Optional. Log to stdout using Slf4j "stdout": { "period": "10 seconds" @@ -98,8 +80,9 @@ # "prefix": "snowplow.enrich." } + # Optional. Cloudwatch metrics for KPL + "cloudwatch": true } - } # Optional, configure telemetry @@ -141,5 +124,5 @@ # Version of the terraform module that deployed the app moduleVersion = 1.0.0 - } + } } diff --git a/config/config.hocon.sample b/config/config.hocon.sample deleted file mode 100644 index ec6851db5..000000000 --- a/config/config.hocon.sample +++ /dev/null @@ -1,223 +0,0 @@ -# Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved. -# -# This program is licensed to you under the Apache License Version 2.0, and -# you may not use this file except in compliance with the Apache License -# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at -# http://www.apache.org/licenses/LICENSE-2.0. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the Apache License Version 2.0 is distributed on an "AS -# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. See the Apache License Version 2.0 for the specific language -# governing permissions and limitations there under. - -# This file (application.conf.example) contains a template with -# configuration options for Stream Enrich. - -enrich { - - streams { - - in { - # Stream/topic where the raw events to be enriched are located - raw = {{streamsInRaw}} - raw = ${?ENRICH_STREAMS_IN_RAW} - } - - out { - # Stream/topic where the events that were successfully enriched will end up - enriched = {{outEnriched}} - enriched = ${?ENRICH_STREAMS_OUT_ENRICHED} - # Stream/topic where the event that failed enrichment will be stored - bad = {{outBad}} - bad = ${?ENRICH_STREAMS_OUT_BAD} - # Stream/topic where the pii tranformation events will end up - pii = {{outPii}} - pii = ${?ENRICH_STREAMS_OUT_PII} - - # How the output stream/topic will be partitioned. - # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid, - # user_ipaddress, domain_sessionid, user_fingerprint. - # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the - # possible parittion keys correspond to. - # Otherwise, the partition key will be a random UUID. - # Note: Nsq does not make use of partition key. - partitionKey = {{partitionKeyName}} - partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY} - } - - # Configuration shown is for Kafka, to use another uncomment the appropriate configuration - # and comment out the other - # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except - # "enabled" which should be set to "stdin". - sourceSink { - # Sources / sinks currently supported are: - # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a - # Kinesis stream - # 'kafka' for reading / writing to a Kafka topic - # 'nsq' for reading / writing to a Nsq topic - # 'stdin' for reading from stdin and writing to stdout and stderr - enabled = {{sinkType}} - enabled = ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED} - - # Region where the streams are located (AWS region, pertinent to kinesis sink/source type) - # region = {{region}} - # region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION} - - ## Optional endpoint url configuration to override aws kinesis endpoints, - ## this can be used to specify local endpoints when using localstack - # customEndpoint = {{kinesisEndpoint}} - # customEndpoint = ${?ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT} - - ## Optional endpoint url configuration to override aws dyanomdb endpoints for Kinesis checkpoints lease table, - ## this can be used to specify local endpoints when using Localstack - # dynamodbCustomEndpoint = "http://localhost:4569" - # dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT} - - # Optional override to disable cloudwatch - # disableCloudWatch = true - # disableCloudWatch = ${?ENRICH_DISABLE_CLOUDWATCH} - - # AWS credentials - # If both are set to 'default', use the default AWS credentials provider chain. - # If both are set to 'iam', use AWS IAM Roles to provision credentials. - # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY - # aws { - # accessKey = iam - # accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY} - # secretKey = iam - # secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY} - # } - - # GCP credentials - # Either provide path to service account file or set environment variable GOOGLE_APPLICATION_CREDENTIALS - # gcp { - # creds = {{googleApplicationCredentials}} - # creds = ${?GOOGLE_APPLICATION_CREDENTIALS} - # } - - # Maximum number of records to get from Kinesis per call to GetRecords - # maxRecords = 10000 - # maxRecords = ${?ENRICH_MAX_RECORDS} - - # LATEST: most recent data. - # TRIM_HORIZON: oldest available data. - # "AT_TIMESTAMP": Start from the record at or after the specified timestamp - # Note: This only effects the first run of this application on a stream. - # (pertinent to kinesis source type) - # initialPosition = TRIM_HORIZON - # initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION} - - # Need to be specified when initial-position is "AT_TIMESTAMP". - # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ". - # Ex: "2017-05-17T10:00:00Z" - # Note: Time need to specified in UTC. - initialTimestamp = "{{initialTimestamp}}" - initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP} - - # Minimum and maximum backoff periods, in milliseconds - backoffPolicy { - minBackoff = {{enrichStreamsOutMinBackoff}} - minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF} - maxBackoff = {{enrichStreamsOutMaxBackoff}} - maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF} - } - - # Or Kafka (Comment out for other types) - brokers = "{{kafkaBrokers}}" - # Number of retries to perform before giving up on sending a record - retries = 0 - # The kafka producer has a variety of possible configuration options defined at - # https://kafka.apache.org/documentation/#producerconfigs - # Some values are set to other values from this config by default: - # "bootstrap.servers" -> brokers - # retries -> retries - # "buffer.memory" -> buffer.byteLimit - # "linger.ms" -> buffer.timeLimit - #producerConf { - # acks = all - # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" - # "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer" - #} - # The kafka consumer has a variety of possible configuration options defined at - # https://kafka.apache.org/documentation/#consumerconfigs - # Some values are set to other values from this config by default: - # "bootstrap.servers" -> brokers - # "group.id" -> appName - #consumerConf { - # "enable.auto.commit" = true - # "auto.commit.interval.ms" = 1000 - # "auto.offset.reset" = earliest - # "session.timeout.ms" = 30000 - # "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer" - # "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer" - #} - - # Or NSQ - ## Channel name for nsq source - ## If more than one application is reading from the same NSQ topic at the same time, - ## all of them must have the same channel name - #rawChannel = "{{nsqSourceChannelName}}" - ## Host name for nsqd - #host = "{{nsqHost}}" - ## TCP port for nsqd, 4150 by default - #port = {{nsqdPort}} - ## Host name for lookupd - #lookupHost = "{{lookupHost}}" - ## HTTP port for nsqlookupd, 4161 by default - #lookupPort = {{nsqlookupdPort}} - } - - # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches recordLimit or - # - the combined size of the stored records reaches byteLimit or - # - the time in milliseconds since it was last emptied exceeds timeLimit when - # a new event enters the buffer - buffer { - byteLimit = {{bufferByteThreshold}} - byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT} - recordLimit = {{bufferRecordThreshold}} # Not supported by Kafka; will be ignored - recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT} - timeLimit = {{bufferTimeThreshold}} - timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT} - } - - # Used for a DynamoDB table to maintain stream state. - # Used as the Kafka consumer group ID. - # Used as the Google PubSub subscription name. - appName = "{{appName}}" - appName = ${?ENRICH_STREAMS_APP_NAME} - } - - # The setting below requires an adapter being ready, i.e.: https://github.com/snowplow-incubator/remote-adapter-example - # remoteAdapters = [ - # { - # vendor: "com.globeandmail" - # version: "v1" - # url: "http://remote-adapter-example:8995/sampleRemoteAdapter" - # connectionTimeout: 1000 - # readTimeout: 5000 - # } - # ] - - # Optional section for tracking endpoints - monitoring { - snowplow { - collectorUri = "{{collectorUri}}" - collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI} - collectorPort = 80 - collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT} - appId = {{enrichAppName}} - appId = ${?ENRICH_MONITORING_APP_ID} - method = GET - method = ${?ENRICH_MONITORING_METHOD} - } - } - - # Optional section for Sentry - sentry { - dsn = ${?SENTRY_DSN} - } -} diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.extended.hocon similarity index 98% rename from config/config.kinesis.reference.hocon rename to config/config.kinesis.extended.hocon index 3f6b638ec..499a78e21 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.extended.hocon @@ -352,30 +352,41 @@ # Optional, configure telemetry # All the fields are optional "telemetry": { + # Set to true to disable telemetry "disable": false + # Interval for the heartbeat event "interval": 15 minutes + # HTTP method used to send the heartbeat event - "method": "POST" + "method": POST + # URI of the collector receiving the heartbeat event - "collectorUri": "collector-g.snowplowanalytics.com" + "collectorUri": collector-g.snowplowanalytics.com + # Port of the collector receiving the heartbeat event "collectorPort": 443 + # Whether to use https or not "secure": true + # Identifier intended to tie events together across modules, # infrastructure and apps when used consistently - "userProvidedId": "my_pipeline" + "userProvidedId": my_pipeline + # ID automatically generated upon running a modules deployment script # Intended to identify each independent module, and the infrastructure it controls - "autoGeneratedId": "hfy67e5ydhtrd" + "autoGeneratedId": hfy67e5ydhtrd + # Unique identifier for the VM instance # Unique for each instance of the app running within a module - "instanceId": "665bhft5u6udjf" + instanceId = 665bhft5u6udjf + # Name of the terraform module that deployed the app - "moduleName": "enrich-kinesis-ce" + moduleName = enrich-kinesis-ce + # Version of the terraform module that deployed the app - "moduleVersion": "1.0.0" - } + moduleVersion = 1.0.0 + } } diff --git a/config/config.pubsub.extended.hocon b/config/config.pubsub.extended.hocon new file mode 100644 index 000000000..340b0c5e3 --- /dev/null +++ b/config/config.pubsub.extended.hocon @@ -0,0 +1,191 @@ +{ + # Collector input + "input": { + "type": "PubSub" + + # Name of the PubSub subscription with the collector payloads + "subscription": "projects/test-project/subscriptions/collector-payloads-sub" + + # Optional. Number of threads used internally by permutive library to handle incoming messages. + # These threads do very little "work" apart from writing the message to a concurrent Queue. + "parallelPullCount": 1 + + # Optional. Configures the "max outstanding element count" of pubSub. + # This is the principal way we control concurrency in the app; it puts an upper bound on the number + # of events in memory at once. An event counts towards this limit starting from when it received + # by the permutive library, until we ack it (after publishing to output). The value must be large + # enough that it does not cause the sink to block whilst it is waiting for a batch to be + # completed. + "maxQueueSize": 3000 + } + + "output": { + # Enriched events output + "good": { + "type": "PubSub" + + # Name of the PubSub topic that will receive the enriched events + "topic": "projects/test-project/topics/enriched" + + # Optional. Enriched event fields to add as PubSub message attributes. + "attributes": [ "app_id" ] + + # Optional. Delay threshold to use for batching. + # After this amount of time has elapsed, + # before maxBatchSize and maxBatchBytes have been reached, + # messages from the buffer will be sent. + "delayThreshold": 200 milliseconds + + # Optional. Maximum number of messages sent within a batch. + # When the buffer reaches this number of messages they are sent. + # PubSub maximum : 1000 + "maxBatchSize": 1000 + + # Optional. Maximum number of bytes sent within a batch. + # When the buffer reaches this size messages are sent. + # PubSub maximum : 10MB + "maxBatchBytes": 10000000 + } + + # Pii events output + "pii": { + "type": "PubSub" + + # Name of the PubSub topic that will receive the pii events + "topic": "projects/test-project/topics/pii" + + # Optional. Pii event fields to add as PubSub message attributes. + # "attributes": [ "app_id" ] + + # Optional. Delay threshold to use for batching. + # After this amount of time has elapsed, + # before maxBatchSize and maxBatchBytes have been reached, + # messages from the buffer will be sent. + "delayThreshold": 200 milliseconds + + # Optional. Maximum number of messages sent within a batch. + # When the buffer reaches this number of messages they are sent. + # PubSub maximum : 1000 + "maxBatchSize": 1000 + + # Optional. Maximum number of bytes sent within a batch. + # When the buffer reaches this size messages are sent. + # PubSub maximum : 10MB + "maxBatchBytes": 10000000 + } + + # Bad rows output + "bad": { + "type": "PubSub" + + # Name of the PubSub topic that will receive the bad rows + "topic": "projects/test-project/topics/bad" + + # Optional. Delay threshold to use for batching. + # After this amount of time has elapsed, + # before maxBatchSize and maxBatchBytes have been reached, + # messages from the buffer will be sent. + "delayThreshold": 200 milliseconds + + # Optional. Maximum number of messages sent within a batch. + # When the buffer reaches this number of messages they are sent. + # PubSub maximum : 1000 + "maxBatchSize": 1000 + + # Optional. Maximum number of bytes sent within a batch. + # When the buffer reaches this size messages are sent. + # PubSub maximum : 10MB + "maxBatchBytes": 10000000 + } + } + + # Optional. Concurrency of the app + "concurrency" : { + # Number of events that can get enriched at the same time within a chunk + "enrich": 256 + # Number of chunks that can get sunk at the same time + "sink": 3 + } + + # Optional. period after which enrich assets should be checked for updates + # no assets will be updated if the key is absent + "assetsUpdatePeriod": "7 days" + + "monitoring": { + + # Optional, for tracking runtime exceptions + "sentry": { + "dsn": "http://sentry.acme.com" + } + + # Optional, configure how metrics are reported + "metrics": { + + # Optional. Send metrics to a StatsD server + "statsd": { + "hostname": "localhost" + "port": 8125 + + # Required, how frequently to report metrics + "period": "10 seconds" + + # Any key-value pairs to be tagged on every StatsD metric + "tags": { + "app": enrich + } + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Log to stdout using Slf4j + "stdout": { + "period": "10 seconds" + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + } + } + + # Optional, configure telemetry + # All the fields are optional + "telemetry": { + + # Set to true to disable telemetry + "disable": false + + # Interval for the heartbeat event + "interval": 15 minutes + + # HTTP method used to send the heartbeat event + "method": POST + + # URI of the collector receiving the heartbeat event + "collectorUri": collector-g.snowplowanalytics.com + + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + + # Whether to use https or not + "secure": true + + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": my_pipeline + + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": hfy67e5ydhtrd + + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + instanceId = 665bhft5u6udjf + + # Name of the terraform module that deployed the app + moduleName = enrich-kinesis-ce + + # Version of the terraform module that deployed the app + moduleVersion = 1.0.0 + } +} diff --git a/config/config.pubsub.minimal.hocon b/config/config.pubsub.minimal.hocon new file mode 100644 index 000000000..60b10232b --- /dev/null +++ b/config/config.pubsub.minimal.hocon @@ -0,0 +1,15 @@ +{ + "input": { + "subscription": "projects/test-project/subscriptions/collector-payloads-sub" + } + + "output": { + "good": { + "topic": "projects/test-project/topics/enriched" + } + + "bad": { + "topic": "projects/test-project/topics/bad" + } + } +} diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 84f1c868e..783f94f06 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -210,7 +210,7 @@ object Environment { private def getRegionFromConfig(file: ConfigFile): Option[String] = file.input match { - case Kinesis(_, _, region, _, _, _, _) => + case Kinesis(_, _, region, _, _, _, _, _, _) => region case _ => None diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index bc028497e..f4d0e4615 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -58,8 +58,11 @@ object ConfigFile { case ConfigFile(_, _, _, Some(aup), _, _) if aup._1 <= 0L => "assetsUpdatePeriod in config file cannot be less than 0".asLeft // TODO: use newtype // Remove pii output if streamName and region empty - case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _) - if(s.isEmpty && r.isEmpty) => c.copy(output = Outputs(good, None, bad)).asRight + case c @ ConfigFile(_, Outputs(good, Some(Output.Kinesis(s, _, _, _, _, _, _, _, _, _, _, _, _)), bad), _, _, _, _) if s.isEmpty => + c.copy(output = Outputs(good, None, bad)).asRight + // Remove pii output if topic empty + case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _) if t.isEmpty => + c.copy(output = Outputs(good, None, bad)).asRight case other => other.asRight } implicit val configFileEncoder: Encoder[ConfigFile] = diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index 7faf7128a..b2566f6e6 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -117,7 +117,7 @@ object ParsedConfigs { private[config] def outputAttributes(output: OutputConfig): EnrichedEvent => Map[String, String] = output match { - case OutputConfig.PubSub(_, Some(attributes), _, _, _, _) => + case OutputConfig.PubSub(_, Some(attributes), _, _, _) => val fields = ParsedConfigs.enrichedFieldsMap.filter { case (s, _) => attributes.contains(s) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 33a1021e4..86b445928 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -53,15 +53,15 @@ object io { case class PubSub private ( subscription: String, - parallelPullCount: Option[Int], - maxQueueSize: Option[Int] + parallelPullCount: Int, + maxQueueSize: Int ) extends Input { val (project, name) = subscription.split("/").toList match { case List("projects", project, "subscriptions", name) => (project, name) case _ => - throw new IllegalArgumentException(s"Cannot construct Input.PubSub from $subscription") + throw new IllegalArgumentException(s"Subscription format $subscription invalid") } } case class FileSystem(dir: Path) extends Input @@ -154,9 +154,9 @@ object io { case other => other.asRight } .emap { - case PubSub(_, Some(p), _) if p < 0 => + case PubSub(_, p, _) if p < 0 => "PubSub parallelPullCount must be > 0".asLeft - case PubSub(_, _, Some(m)) if m < 0 => + case PubSub(_, _, m) if m < 0 => "PubSub maxQueueSize must be > 0".asLeft case other => other.asRight @@ -165,7 +165,11 @@ object io { deriveConfiguredEncoder[Input] } - case class Outputs(good: Output, pii: Option[Output], bad: Output) + case class Outputs( + good: Output, + pii: Option[Output], + bad: Output + ) object Outputs { implicit val outputsDecoder: Decoder[Outputs] = deriveConfiguredDecoder[Outputs] implicit val outputsEncoder: Encoder[Outputs] = deriveConfiguredEncoder[Outputs] @@ -177,17 +181,18 @@ object io { case class PubSub private ( topic: String, attributes: Option[Set[String]], - delayThreshold: Option[FiniteDuration], - maxBatchSize: Option[Long], - maxBatchBytes: Option[Long], - numCallbackExecutors: Option[Int] + delayThreshold: FiniteDuration, + maxBatchSize: Long, + maxBatchBytes: Long ) extends Output { val (project, name) = topic.split("/").toList match { + case _ if topic.isEmpty => + ("", "") case List("projects", project, "topics", name) => (project, name) case _ => - throw new IllegalArgumentException(s"Cannot construct Output.PubSub from $topic") + throw new IllegalArgumentException(s"Topic format $topic invalid") } } case class FileSystem(file: Path, maxBytes: Option[Long]) extends Output @@ -234,26 +239,24 @@ object io { implicit val outputDecoder: Decoder[Output] = deriveConfiguredDecoder[Output] .emap { - case s @ PubSub(top, _, _, _, _, _) => + case s @ PubSub(top, _, _, _, _) if top.nonEmpty => top.split("/").toList match { case List("projects", _, "topics", _) => s.asRight case _ => s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft } - case Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _) if(s.isEmpty && r.nonEmpty) => + case Kinesis(s, r, _, _, _, _, _, _, _, _, _, _, _) if s.isEmpty && r.nonEmpty => "streamName needs to be set".asLeft case other => other.asRight } .emap { - case PubSub(_, _, Some(d), _, _, _) if d < Duration.Zero => + case PubSub(_, _, d, _, _) if d < Duration.Zero => "PubSub delay threshold cannot be less than 0".asLeft - case PubSub(_, _, _, Some(m), _, _) if m < 0 => + case PubSub(_, _, _, m, _) if m < 0 => "PubSub max batch size cannot be less than 0".asLeft - case PubSub(_, _, _, _, Some(m), _) if m < 0 => + case PubSub(_, _, _, _, m) if m < 0 => "PubSub max batch bytes cannot be less than 0".asLeft - case PubSub(_, _, _, _, _, Some(m)) if m < 0 => - "PubSub callback executors cannot be less than 0".asLeft case other => other.asRight } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala index e154472a5..a30bbe0f0 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/CliConfigSpec.scala @@ -20,9 +20,9 @@ class CliConfigSpec extends Specification with CatsIO { "parseHocon" should { "parse valid HOCON" in { val string = """ - input = { - type = "PubSub" - subscription = "inputSub" + "input": { + "type": "PubSub" + "subscription": "projects/test-project/subscriptions/collector-payloads-sub" } """.stripMargin Base64Hocon.parseHocon(string) must beRight diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 252ef7219..14440226b 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -30,13 +30,13 @@ import org.specs2.mutable.Specification class ConfigFileSpec extends Specification with CatsIO { "parse" should { "parse reference example for PubSub" in { - val configPath = Paths.get(getClass.getResource("/config.pubsub.hocon.sample").toURI) + val configPath = Paths.get(getClass.getResource("/config.pubsub.extended.hocon").toURI) val expected = ConfigFile( - io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), + io.Input.PubSub("projects/test-project/subscriptions/collector-payloads-sub", 1, 3000), io.Outputs( - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + io.Output.PubSub("projects/test-project/topics/enriched", Some(Set("app_id")), 200.milliseconds, 1000, 10000000), + Some(io.Output.PubSub("projects/test-project/topics/pii", None, 200.milliseconds, 1000, 10000000)), + io.Output.PubSub("projects/test-project/topics/bad", None, 200.milliseconds, 1000, 10000000) ), io.Concurrency(256, 3), Some(7.days), @@ -70,7 +70,7 @@ class ConfigFileSpec extends Specification with CatsIO { } "parse reference example for Kinesis" in { - val configPath = Paths.get(getClass.getResource("/config.kinesis.reference.hocon").toURI) + val configPath = Paths.get(getClass.getResource("/config.kinesis.extended.hocon").toURI) val expected = ConfigFile( io.Input.Kinesis( "snowplow-enrich-kinesis", @@ -168,20 +168,31 @@ class ConfigFileSpec extends Specification with CatsIO { json"""{ "input": { "type": "PubSub", - "subscription": "projects/test-project/subscriptions/inputSub" + "subscription": "projects/test-project/subscriptions/inputSub", + "parallelPullCount": 1, + "maxQueueSize": 3000 }, "output": { "good": { "type": "PubSub", - "topic": "projects/test-project/topics/good-topic" + "topic": "projects/test-project/topics/good-topic", + "delayThreshold": "200 milliseconds", + "maxBatchSize": 1000, + "maxBatchBytes": 10000000 }, "pii": { "type": "PubSub", - "topic": "projects/test-project/topics/pii-topic" + "topic": "projects/test-project/topics/pii-topic", + "delayThreshold": "200 milliseconds", + "maxBatchSize": 1000, + "maxBatchBytes": 10000000 }, "bad": { "type": "PubSub", - "topic": "projects/test-project/topics/bad-topic" + "topic": "projects/test-project/topics/bad-topic", + "delayThreshold": "200 milliseconds", + "maxBatchSize": 1000, + "maxBatchBytes": 10000000 } }, "concurrency": { diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index a29f6270c..cfa620dd1 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -31,11 +31,13 @@ class ParsedConfigsSpec extends Specification with CatsIO { val invalidAttr2 = "invalidAttr2" val configFile = ConfigFile( - io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None), + io.Input.PubSub("projects/test-project/subscriptions/inputSub", 1, 3000), io.Outputs( - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None), - Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None) + io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), 200.milliseconds, 1000, 10000000), + Some( + io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), 200.milliseconds, 1000, 10000000) + ), + io.Output.PubSub("projects/test-project/topics/bad-topic", None, 200.milliseconds, 1000, 10000000) ), io.Concurrency(10000, 64), Some(7.days), @@ -72,7 +74,7 @@ class ParsedConfigsSpec extends Specification with CatsIO { "outputAttributes" should { "fetch attribute values" in { - val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None) + val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), 200.milliseconds, 1000, 10000000) val ee = new EnrichedEvent() ee.app_id = "test_app" diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index 866f097b2..3d8228cb2 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -32,8 +32,6 @@ "type": "Kinesis" # we need all the fields to exist to have defaults "streamName": "" - # we need all the fields to exist to have defaults - "region": "" "backoffPolicy": { "minBackoff": 100 milliseconds "maxBackoff": 10 seconds diff --git a/modules/pubsub/src/main/resources/application.conf b/modules/pubsub/src/main/resources/application.conf new file mode 100644 index 000000000..05df53957 --- /dev/null +++ b/modules/pubsub/src/main/resources/application.conf @@ -0,0 +1,46 @@ +{ + "input": { + "type": "PubSub" + "parallelPullCount": 1 + "maxQueueSize": 3000 + } + + "output": { + "good": { + "type": "PubSub" + "delayThreshold": 200 milliseconds + "maxBatchSize": 1000 + "maxBatchBytes": 10000000 + } + + "pii": { + "type": "PubSub" + # we need all the fields to exist to have defaults + "topic": "" + "delayThreshold": 200 milliseconds + "maxBatchSize": 1000 + "maxBatchBytes": 10000000 + } + + "bad": { + "type": "PubSub" + "delayThreshold": 200 milliseconds + "maxBatchSize": 1000 + "maxBatchBytes": 10000000 + } + } + + "concurrency" : { + "enrich": 256 + "sink": 3 + } + + "telemetry": { + "disable": false + "interval": 15 minutes + "method": POST + "collectorUri": collector-g.snowplowanalytics.com + "collectorPort": 443 + "secure": true + } +} diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala index 93c3bb69a..040c1dd76 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala @@ -12,8 +12,6 @@ */ package com.snowplowanalytics.snowplow.enrich.pubsub -import scala.concurrent.duration._ - import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -33,29 +31,6 @@ object Sink { private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - /** - * Set the delay threshold to use for batching. After this amount of time has elapsed (counting - * from the first element added), the elements will be wrapped up in a batch and sent. - * - * If the source MaxQueueSize is small, then the queue could block the app by waiting between - * batches if DelayThreshold is too large. If the source MaxQueueSize is sufficiently large, - * then the DelayThreshold should not ever cause blocking. - */ - val DefaultDelayThreshold: FiniteDuration = 200.milliseconds - - /** - * A batch of messages will be emitted to PubSub when the batch reaches 1000 messages. - * We use 1000 because it is the maximum batch size allowed by PubSub. - * This overrides the permutive library default of `5` - */ - val DefaultPubsubMaxBatchSize = 1000L - - /** - * A batch of messages will be emitted to PubSub when the batch reaches 10 MB. - * We use 10MB because it is the maximum batch size allowed by PubSub. - */ - val DefaultPubsubMaxBatchBytes = 10000000L - def init[F[_]: Concurrent: ContextShift: Timer]( output: Output ): Resource[F, ByteSink[F]] = @@ -80,9 +55,9 @@ object Sink { output: Output.PubSub ): Resource[F, AttributedData[A] => F[Unit]] = { val config = PubsubProducerConfig[F]( - batchSize = output.maxBatchSize.getOrElse(DefaultPubsubMaxBatchSize), - requestByteThreshold = Some(output.maxBatchBytes.getOrElse(DefaultPubsubMaxBatchBytes)), - delayThreshold = output.delayThreshold.getOrElse(DefaultDelayThreshold), + batchSize = output.maxBatchSize, + requestByteThreshold = Some(output.maxBatchBytes), + delayThreshold = output.delayThreshold, onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error") ) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala index 7aff8f1e3..832e93bcf 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala @@ -27,24 +27,6 @@ import cats.effect.{Blocker, ContextShift, Sync} object Source { - /** - * Number of threads used internally by permutive library to handle incoming messages. - * These threads do very little "work" apart from writing the message to a concurrent Queue. - * Overrides the permutive library default of `3`. - */ - val DefaultParallelPullCount = 1 - - /** - * Configures the "max outstanding element count" of pubSub. - * - * This is the principal way we control concurrency in the app; it puts an upper bound on the number - * of events in memory at once. An event counts towards this limit starting from when it received - * by the permutive library, until we ack it (after publishing to output). The value must be large - * enough that it does not cause the sink to block whilst it is waiting for a batch to be - * completed. - */ - val DefaultMaxQueueSize = 3000 - def init[F[_]: Concurrent: ContextShift]( blocker: Blocker, input: Input @@ -65,8 +47,8 @@ object Source { val pubSubConfig = PubsubGoogleConsumerConfig( onFailedTerminate = onFailedTerminate, - parallelPullCount = input.parallelPullCount.getOrElse(DefaultParallelPullCount), - maxQueueSize = input.maxQueueSize.getOrElse(DefaultMaxQueueSize) + parallelPullCount = input.parallelPullCount, + maxQueueSize = input.maxQueueSize ) val projectId = Model.ProjectId(input.project) val subscriptionId = Model.Subscription(input.name)