Skip to content

Commit

Permalink
Kafka Sink rewrite without fs2-kafka (#100)
Browse files Browse the repository at this point in the history
Previously we were using the fs2-kafka wrapper around the KafkaProducer
when sending events to Kafka. The fs2-kafka wrapper executes every
`send` on the CE3 blocking thread. We found in the Snowplow collector
that this implementation can be problematic because under some blocking
scenarios it causes the CE3 blocking thread pool to create a very large
number of threads.  The huge number of threads could cause a OOM.

This new implementation still uses the CE3 blocking thread pool, but it
calls `send` many times within the same `Sync[F].blocking{...}`. This
should prevent the problem where very many concurruent calls to
`Sync[F].blocking` triggers the thread pool to grow to one thread per
pending event.

Note, this implementation is different to what we chose for the Snowplow
collector. For the latter, we used a dedicated single-thread executor
for calling `send`. The difference is because in common-streams we have
the luxury of working in batches, whereas the Snowplow collector tends
to receive events one-by-one, and thus needs to call `send` one-by-one.
  • Loading branch information
istreeter authored Dec 17, 2024
1 parent b879065 commit 86f0316
Showing 1 changed file with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,66 @@
*/
package com.snowplowanalytics.snowplow.sinks.kafka

import cats.effect.Async
import cats.effect.kernel.Resource
import cats.implicits._
import cats.Monad
import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import fs2.kafka._
import cats.effect.{Async, Resource, Sync}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}

import scala.reflect._
import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler

import java.util.UUID

import com.snowplowanalytics.snowplow.azure.AzureAuthenticationCallbackHandler
import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag
import scala.jdk.CollectionConverters._

object KafkaSink {

def resource[F[_]: Async, T <: AzureAuthenticationCallbackHandler](
config: KafkaSinkConfig,
authHandlerClass: ClassTag[T]
): Resource[F, Sink[F]] = {
val producerSettings =
ProducerSettings[F, String, Array[Byte]]
.withProperty("sasl.login.callback.handler.class", authHandlerClass.runtimeClass.getName)
.withBootstrapServers(config.bootstrapServers)
.withProperties(config.producerConf)

): Resource[F, Sink[F]] =
for {
producer <- KafkaProducer[F].resource(producerSettings)
} yield fromFs2Producer(config, producer)
producer <- makeProducer(config, authHandlerClass)
} yield impl(config, producer)

private def makeProducer[F[_]: Async, T <: AzureAuthenticationCallbackHandler](
config: KafkaSinkConfig,
authHandlerClass: ClassTag[T]
): Resource[F, KafkaProducer[String, Array[Byte]]] = {
val producerSettings = Map(
"bootstrap.servers" -> config.bootstrapServers,
"sasl.login.callback.handler.class" -> authHandlerClass.runtimeClass.getName,
"key.serializer" -> classOf[StringSerializer].getName,
"value.serializer" -> classOf[ByteArraySerializer].getName
) ++ config.producerConf
val make = Sync[F].delay {
new KafkaProducer[String, Array[Byte]]((producerSettings: Map[String, AnyRef]).asJava)
}
Resource.make(make)(p => Sync[F].blocking(p.close))
}

private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] =
private def impl[F[_]: Sync](config: KafkaSinkConfig, producer: KafkaProducer[String, Array[Byte]]): Sink[F] =
Sink { batch =>
val records = batch.copyToChunk.map(toProducerRecord(config, _))
producer.produce(records).flatten.void
Sync[F].interruptible {
val futures = batch.asIterable.map { e =>
val record = toProducerRecord(config, e)
producer.send(record)
}.toIndexedSeq

futures.foreach(_.get)
}
}

private def toProducerRecord(config: KafkaSinkConfig, sinkable: Sinkable): ProducerRecord[String, Array[Byte]] = {
val headers = Headers.fromIterable {
sinkable.attributes.map { case (k, v) =>
Header(k, v)

val headers = sinkable.attributes.map { case (k, v) =>
new Header {
def key: String = k
def value: Array[Byte] = v.getBytes(StandardCharsets.UTF_8)
}
}
ProducerRecord(config.topicName, sinkable.partitionKey.getOrElse(UUID.randomUUID.toString), sinkable.bytes)
.withHeaders(headers)

new ProducerRecord(config.topicName, null, sinkable.partitionKey.getOrElse(UUID.randomUUID.toString), sinkable.bytes, headers.asJava)
}
}

0 comments on commit 86f0316

Please sign in to comment.