From 0c02af996ce9b65d2c9e14ca6ccc52300ce8b8e4 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Mon, 15 Apr 2024 03:01:42 -0400 Subject: [PATCH] feat: Add transaction prefix option (#1733) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Johan Andrén Co-authored-by: Enno Runne <458526+ennru@users.noreply.github.com> --- .../mima-filters/5.0.0.backwards.excludes | 5 ++- core/src/main/resources/reference.conf | 4 +++ .../scala/akka/kafka/ProducerSettings.scala | 22 ++++++++++--- .../akka/kafka/scaladsl/Transactional.scala | 3 +- docs/src/main/paradox/transactions.md | 2 +- .../kafka/scaladsl/TransactionsSpec.scala | 32 +++++++++++++++++++ 6 files changed, 60 insertions(+), 8 deletions(-) diff --git a/core/src/main/mima-filters/5.0.0.backwards.excludes b/core/src/main/mima-filters/5.0.0.backwards.excludes index 8a1982280..c38b69e59 100644 --- a/core/src/main/mima-filters/5.0.0.backwards.excludes +++ b/core/src/main/mima-filters/5.0.0.backwards.excludes @@ -2,4 +2,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext") + +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ProducerSettings.this") \ No newline at end of file diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 83fa291d6..72b2ddcb2 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -36,6 +36,10 @@ akka.kafka.producer { # for exactly-once-semantics processing. eos-commit-interval = 100ms + # This can be used to prepend a value to the beginning of a transaction id when using + # the `Transactional.sink` or `Transactional.flow` for exactly-once-semantics processing. + transaction-id-prefix = "" + # Properties defined by org.apache.kafka.clients.producer.ProducerConfig # can be defined in this configuration section. kafka-clients { diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index c403b1a08..f3da07516 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -78,6 +78,7 @@ object ProducerSettings { val parallelism = config.getInt("parallelism") val dispatcher = config.getString("use-dispatcher") val eosCommitInterval = config.getDuration("eos-commit-interval").asScala + val transactionIdPrefix = config.getString("transaction-id-prefix") new ProducerSettings[K, V]( properties, keySerializer, @@ -88,7 +89,8 @@ object ProducerSettings { dispatcher, eosCommitInterval, enrichAsync = None, - producerFactorySync = None + producerFactorySync = None, + transactionIdPrefix = transactionIdPrefix ) } @@ -233,7 +235,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( val dispatcher: String, val eosCommitInterval: FiniteDuration, val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]], - val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] + val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]], + val transactionIdPrefix: String ) { @deprecated( @@ -339,6 +342,12 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] = copy(eosCommitInterval = eosCommitInterval.asScala) + /** + * The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`. + */ + def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] = + copy(transactionIdPrefix = transactionIdPrefix) + /** * Scala API. * A hook to allow for resolving some settings asynchronously. @@ -387,7 +396,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( dispatcher: String = dispatcher, eosCommitInterval: FiniteDuration = eosCommitInterval, enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync, - producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync + producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync, + transactionIdPrefix: String = transactionIdPrefix ): ProducerSettings[K, V] = new ProducerSettings[K, V](properties, keySerializer, @@ -398,7 +408,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( dispatcher, eosCommitInterval, enrichAsync, - producerFactorySync) + producerFactorySync, + transactionIdPrefix) private final val propertiesAllowList = Set( "acks", @@ -436,7 +447,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( s"dispatcher=$dispatcher," + s"eosCommitInterval=${eosCommitInterval.toCoarsest}," + s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," + - s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" + s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" + + s",transactionIdPrefix=$transactionIdPrefix" } /** diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala index 7f1983839..447faca22 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala @@ -115,12 +115,13 @@ object Transactional { * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that * emits a [[ConsumerMessage.TransactionalMessage]]. * The flow will override producer properties to enable Kafka exactly-once transactional support. + * A UUID is used for transaction id, optionally prefixed through [[ProducerSettings.transactionIdPrefix]] */ @nowarn("msg=deprecated") def flow[K, V]( settings: ProducerSettings[K, V] ): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = - flow(settings, UUID.randomUUID().toString) + flow(settings, settings.transactionIdPrefix + UUID.randomUUID().toString) /** * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index 31121ef18..1c13d4bec 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -55,7 +55,7 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow. They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions. -The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). +The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). In addition, you can optionally set `akka.kafka.producer.transaction-id-prefix` to prefix in front of the generated transaction ID should your specifications require this level of control. ## Consume-Transform-Produce Workflow diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index 83ad0f31f..bbb641537 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -55,6 +55,38 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa } } + "complete in happy-path scenario with a transaction prefix" in { + assertAllStagesStopped { + val sourceTopic = createTopic(1) + val sinkTopic = createTopic(2) + val group = createGroupId(1) + + Await.result(produce(sourceTopic, 1 to 100), remainingOrDefault) + + val consumerSettings = consumerDefaults.withGroupId(group) + + val control = + transactionalCopyStream(consumerSettings, + txProducerDefaults.withTransactionIdPrefix("my-prefix-"), + sourceTopic, + sinkTopic, + 10.seconds) + .toMat(Sink.ignore)(Keep.left) + .run() + + val probeConsumerGroup = createGroupId(2) + + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) + + probeConsumer + .request(100) + .expectNextN((1 to 100).map(_.toString)) + + probeConsumer.cancel() + Await.result(control.shutdown(), remainingOrDefault) + } + } + "complete when messages are filtered out" in assertAllStagesStopped { val sourceTopic = createTopic(1) val sinkTopic = createTopic(2)