Skip to content

Commit

Permalink
config refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jun 23, 2022
1 parent 572adf1 commit 5a64e5e
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 277 deletions.
192 changes: 27 additions & 165 deletions config/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -89,69 +89,23 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy for cats-retry to retry after failures writing to kinesis
# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
"maxCount": 500

# Maximum amount of data to send with a PutRecords request
"maxSize": 5242880
}

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
# If not specified, aggregation is deactivated
#"aggregation": {
# # Maximum number of enriched events to pack into an aggregated Kinesis record
# "maxCount": 4294967295
#
# # Maximum number of bytes to pack into an aggregated Kinesis record
# "maxSize": 51200
#}

# Optional. Maximum number of connections to open in the backend.
# HTTP requests are sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput
"maxConnections": 24

# Optional. Minimum level of logs for the native KPL daemon.
# Logs show up on stderr
# Possible values: trace | debug | info | warning | error
"logLevel": "warning"
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
"recordLimit": 500

# Optional. Use a custom Kinesis endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS.
# Needs to be specified along with customPort
# "customEndpoint": "localhost"

# Optional. Server port to connect to for Kinesis.
# Needs to be specified along with customEndpoint
# "customPort": 4566

# Optional. Use a custom Cloudwatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS
# Needs to be specified along with cloudwatchPort
# "cloudwatchEndpoint": "localhost"

# Optional. Server port to connect to for CloudWatch.
# Needs to be specified along with cloudwatchPort
# "cloudwatchPort": 4582
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}

# Pii events output
Expand All @@ -175,69 +129,23 @@
# Otherwise, the partition key will be a random UUID.
# "partitionKey": "user_id"

# Optional. Policy for cats-retry to retry after failures writing to kinesis
# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
"maxCount": 500

# Maximum amount of data to send with a PutRecords request
"maxSize": 5242880
}

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
# If not specified, aggregation is deactivated
#"aggregation": {
# # Maximum number of enriched events to pack into an aggregated Kinesis record
# "maxCount": 4294967295
#
# # Maximum number of bytes to pack into an aggregated Kinesis record
# "maxSize": 51200
#}

# Optional. Maximum number of connections to open in the backend.
# HTTP requests are sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput
"maxConnections": 24

# Optional. Minimum level of logs for the native KPL daemon.
# Logs show up on stderr
# Possible values: trace | debug | info | warning | error
"logLevel": "warning"
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
"recordLimit": 500

# Optional. Use a custom Kinesis endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS.
# Needs to be specified along with customPort
# "customEndpoint": "localhost"

# Optional. Server port to connect to for Kinesis.
# Needs to be specified along with customEndpoint
# "customPort": 4566

# Optional. Use a custom Cloudwatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS
# Needs to be specified along with cloudwatchPort
# "cloudwatchEndpoint": "localhost"

# Optional. Server port to connect to for CloudWatch.
# Needs to be specified along with cloudwatchPort
# "cloudwatchPort": 4582
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}

# Bad rows output
Expand All @@ -253,69 +161,23 @@
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"

# Optional. Policy for cats-retry to retry after failures writing to kinesis
# Optional. Policy to retry if writing to kinesis fails.
# This policy is used in 2 places:
# - When the PutRecords request errors
# - When the requests succeeds but some records couldn't get inserted
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}

# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis

# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
"maxCount": 500

# Maximum amount of data to send with a PutRecords request
"maxSize": 5242880
}

# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
# If not specified, aggregation is deactivated
#"aggregation": {
# # Maximum number of enriched events to pack into an aggregated Kinesis record
# "maxCount": 4294967295
#
# # Maximum number of bytes to pack into an aggregated Kinesis record
# "maxSize": 51200
#}

# Optional. Maximum number of connections to open in the backend.
# HTTP requests are sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput
"maxConnections": 24

# Optional. Minimum level of logs for the native KPL daemon.
# Logs show up on stderr
# Possible values: trace | debug | info | warning | error
"logLevel": "warning"
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
"recordLimit": 500

# Optional. Use a custom Kinesis endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS.
# Needs to be specified along with customPort
# "customEndpoint": "localhost"

# Optional. Server port to connect to for Kinesis.
# Needs to be specified along with customEndpoint
# "customPort": 4566

# Optional. Use a custom Cloudwatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS
# Needs to be specified along with cloudwatchPort
# "cloudwatchEndpoint": "localhost"

# Optional. Server port to connect to for CloudWatch.
# Needs to be specified along with cloudwatchPort
# "cloudwatchPort": 4582
# Can be used for instance to work locally with localstack
# "customEndpoint": "https://localhost:4566"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.snowplowanalytics.snowplow.badrows.Processor

import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{Input, Monitoring, Output, RetryCheckpointing}
import com.snowplowanalytics.snowplow.enrich.common.fs2.config.{CliConfig, ParsedConfigs}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{Sink, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.{FileSink, Source}
import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients.Client

object Run {
Expand Down Expand Up @@ -75,7 +75,7 @@ object Run {
sinkPii = file.output.pii.map(out => initAttributedSink(blocker, out, mkSinkPii))
sinkBad = file.output.bad match {
case f: Output.FileSystem =>
Sink.fileSink[F](f, blocker)
FileSink.fileSink[F](f, blocker)
case _ =>
mkSinkBad(blocker, file.output.bad)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ object Run {
): Resource[F, AttributedByteSink[F]] =
output match {
case f: Output.FileSystem =>
Sink.fileSink[F](f, blocker).map(sink => records => sink(records.map(_.data)))
FileSink.fileSink[F](f, blocker).map(sink => records => sink(records.map(_.data)))
case _ =>
mkSinkGood(blocker, output)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object ParsedConfigs {
if (invalidAttributes.nonEmpty) NonEmptyList(invalidAttributes.head, invalidAttributes.tail.toList).invalid
else output.valid
}
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _, _) if !enrichedFieldsMap.contains(key) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _) if !enrichedFieldsMap.contains(key) =>
NonEmptyList.one(s"Partition key $key not valid").invalid
case _ =>
output.valid
Expand All @@ -123,7 +123,7 @@ object ParsedConfigs {
attributes.contains(s)
}
attributesFromFields(fields)
case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _, _, _, _, _, _, _) =>
case OutputConfig.Kinesis(_, _, Some(key), _, _, _) =>
val fields = ParsedConfigs.enrichedFieldsMap.filter {
case (s, _) =>
s == key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,8 @@ object io {
region: Option[String],
partitionKey: Option[String],
backoffPolicy: BackoffPolicy,
recordTtl: FiniteDuration,
maxBufferedTime: FiniteDuration,
collection: Collection,
aggregation: Option[Aggregation],
maxConnections: Long,
logLevel: String,
customEndpoint: Option[URI],
customPort: Option[Long],
cloudwatchEndpoint: Option[URI],
cloudwatchPort: Option[Long]
recordLimit: Int,
customEndpoint: Option[URI]
) extends Output

case class BackoffPolicy(
Expand All @@ -247,22 +239,6 @@ object io {
deriveConfiguredEncoder[BackoffPolicy]
}

case class Collection(maxCount: Long, maxSize: Long)
object Collection {
implicit def collectionDecoder: Decoder[Collection] =
deriveConfiguredDecoder[Collection]
implicit def collectionEncoder: Encoder[Collection] =
deriveConfiguredEncoder[Collection]
}

case class Aggregation(maxCount: Long, maxSize: Long)
object Aggregation {
implicit def aggregationDecoder: Decoder[Aggregation] =
deriveConfiguredDecoder[Aggregation]
implicit def aggregationEncoder: Encoder[Aggregation] =
deriveConfiguredEncoder[Aggregation]
}

implicit val outputDecoder: Decoder[Output] =
deriveConfiguredDecoder[Output]
.emap {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 Snowplow Analytics Ltd. All rights reserved.
* Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -26,7 +26,7 @@ import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output.{FileSy

import com.snowplowanalytics.snowplow.enrich.common.fs2.ByteSink

object Sink {
object FileSink {

def fileSink[F[_]: Concurrent: ContextShift](config: FileSystemConfig, blocker: Blocker): Resource[F, ByteSink[F]] =
config.maxBytes match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,7 @@ class ConfigFileSpec extends Specification with CatsIO {
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
500,
None
),
Some(
Expand All @@ -121,15 +113,7 @@ class ConfigFileSpec extends Specification with CatsIO {
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
500,
None
)
),
Expand All @@ -138,15 +122,7 @@ class ConfigFileSpec extends Specification with CatsIO {
Some("eu-central-1"),
None,
io.Output.BackoffPolicy(100.millis, 10.seconds, 10),
20.seconds,
100.millis,
io.Output.Collection(500, 5242880),
None,
24,
"warning",
None,
None,
None,
500,
None
)
),
Expand Down
Loading

0 comments on commit 5a64e5e

Please sign in to comment.