-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enrich-kinesis #481
enrich-kinesis #481
Conversation
3223fea
to
6438310
Compare
c166e5d
to
7368210
Compare
bc33c83
to
982471a
Compare
abd6c67
to
1408928
Compare
982471a
to
42c4153
Compare
6c75729
to
9b8df00
Compare
582cf4e
to
4c17829
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've only given this a quick look, but it looks very good indeed 🎉
modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala
Show resolved
Hide resolved
...s/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me in general 👍 Left a few small comments in there.
...s/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Run.scala
Show resolved
Hide resolved
...es/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DynamoDbConfig.scala
Outdated
Show resolved
Hide resolved
...es/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DynamoDbConfig.scala
Outdated
Show resolved
Hide resolved
modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/S3Client.scala
Show resolved
Hide resolved
modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala
Outdated
Show resolved
Hide resolved
2204cb7
to
4f76c85
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed there's a few weeks old unsubmitted review from me, but most of the stuff is addressed already. Just one nit has left. Otherwise - looks great!
...common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala
Outdated
Show resolved
Hide resolved
...s/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala
Outdated
Show resolved
Hide resolved
@@ -12,22 +12,32 @@ | |||
*/ | |||
package com.snowplowanalytics.snowplow.enrich.common.fs2.io |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io
is shadowing namespace of dependencies like io.circe
. This confuses IDE when importing io.*
in test
.
I suggest changing it to inout
or some other name that is not used as a top-level domain.
1a97585
to
7a6dcfb
Compare
d68be1a
to
5778418
Compare
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Outdated
Show resolved
Hide resolved
modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala
Show resolved
Hide resolved
@@ -0,0 +1,55 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many collector payloads do you think it's possible to be holding in memory at the same time with these default settings? If you're processing, say, 5 kinesis shards per enrich instance.
I count 190,000 (admittedly a worse case scenario). And a further 10,000 more per additional shard.
Here's how:
Each polling batch is 10000 records. The fs2-aws library uses a buffer that holds 10 batches. Plus there can be extra batch per shard on this line waiting to enqueue.
Then you have the batch you're currently processing, plus three extra batches because of the prefetches here, here and here
This is just to explain why I've been questioning memory problems, especially for large collector payloads. Understanding the scope of this problem will be difficult, because we would need to explore regular processing, and also error-handling scenarios, e.g. kinesis problems, enrichment problems. The latter are harder to predict or simulate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very interesting!
I agree with your way of counting. There has been only a little change, I removed the .prefetch
to now use >1
for sink.concurrency
here so now it's concurrency - 1
additional shards instead of three.
Because of the enqueuing for each shard I'm afraid the memory used by the app is unbounded. So we need to make sure that our auto-scaling is good enough so that new instances are spawned before the app gets a chance to go OutOfMemory. I'll look at our scaling strategy in Terraform. But I think that whatever the strategy, it would be great to use the average record size of a customer to determine the memory allocated to the JVM. /cc @oguzhanunlu @jbeemster
6dc77b5
to
6b6f05b
Compare
...common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala
Outdated
Show resolved
Hide resolved
e7dd10b
to
fed1b4d
Compare
7a6dcfb
to
1f1c938
Compare
ea9b2db
to
d008234
Compare
c0815ee
to
78c3a84
Compare
0.18.0.1-M3
EnrichedEvent
don't match the type inatomic
schema (e.g. here and here)% Runtime
to sts