Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Sink rewrite without fs2-kafka #100

Merged
merged 1 commit into from
Dec 17, 2024
Merged

Conversation

istreeter
Copy link
Contributor

Previously we were using the fs2-kafka wrapper around the KafkaProducer when sending events to Kafka. The fs2-kafka wrapper executes every send on the CE3 blocking thread. We found in the Snowplow collector that this implementation can be problematic because under some blocking scenarios it causes the CE3 blocking thread pool to create a very large number of threads. The huge number of threads could cause a OOM.

This new implementation still uses the CE3 blocking thread pool, but it calls send many times within the same Sync[F].blocking{...}. This should prevent the problem where very many concurruent calls to Sync[F].blocking triggers the thread pool to grow to one thread per pending event.

Note, this implementation is different to what we chose for the Snowplow collector. For the latter, we used a dedicated single-thread executor for calling send. The difference is because in common-streams we have the luxury of working in batches, whereas the Snowplow collector tends to receive events one-by-one, and thus needs to call send one-by-one.

Previously we were using the fs2-kafka wrapper around the KafkaProducer
when sending events to Kafka. The fs2-kafka wrapper executes every
`send` on the CE3 blocking thread. We found in the Snowplow collector
that this implementation can be problematic because under some blocking
scenarios it causes the CE3 blocking thread pool to create a very large
number of threads.  The huge number of threads could cause a OOM.

This new implementation still uses the CE3 blocking thread pool, but it
calls `send` many times within the same `Sync[F].blocking{...}`. This
should prevent the problem where very many concurruent calls to
`Sync[F].blocking` triggers the thread pool to grow to one thread per
pending event.

Note, this implementation is different to what we chose for the Snowplow
collector. For the latter, we used a dedicated single-thread executor
for calling `send`. The difference is because in common-streams we have
the luxury of working in batches, whereas the Snowplow collector tends
to receive events one-by-one, and thus needs to call `send` one-by-one.
Sync[F].interruptible {
val futures = batch.asIterable.map { e =>
val record = toProducerRecord(config, e)
producer.send(record)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I get this right that in case of the buffer with pending records to get sent beig full, this call becomes blocking, which is why we also include it in the Sync[F].interruptible { } ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It becomes blocking under two circumstances:

  1. The buffer is full, as you say.
  2. The client needs to re-fetch topic metadata from the broker

Case 1 can be avoided by increasing the size of the buffer. But case 2 is unavoidable. So it must be run inside either Sync[F].blocking or Sync[F].interruptible (which are approximately the same thing).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The client needs to re-fetch topic metadata from the broker

I remember a PR where you regularly manually fetch all the metadata in-parallel to avoid this blocking fetch. Do you now think that we should not do that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the PR you are thinking of, but I abandoned it because I was wrong about it. It is possible to pre-fetch the metadata when the app first starts, to avoid blocking on the first fetch. But it is not possible to avoid blocking when the client decides to periodically re-fetch the metadata.

@istreeter istreeter merged commit 86f0316 into develop Dec 17, 2024
1 check passed
@istreeter istreeter deleted the kafka-sink-improvement branch December 17, 2024 08:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants