From 13f83e961e93d3434a7bcbe655915bb3f7d32098 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 11 Apr 2024 17:35:40 -0400 Subject: [PATCH 1/6] Add transaction prefix option --- core/src/main/resources/reference.conf | 4 ++++ .../scala/akka/kafka/ProducerSettings.scala | 19 ++++++++++++++----- .../akka/kafka/scaladsl/Transactional.scala | 2 +- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 83fa291d6..72b2ddcb2 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -36,6 +36,10 @@ akka.kafka.producer { # for exactly-once-semantics processing. eos-commit-interval = 100ms + # This can be used to prepend a value to the beginning of a transaction id when using + # the `Transactional.sink` or `Transactional.flow` for exactly-once-semantics processing. + transaction-id-prefix = "" + # Properties defined by org.apache.kafka.clients.producer.ProducerConfig # can be defined in this configuration section. kafka-clients { diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index c403b1a08..2e6ec7ab2 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -78,6 +78,7 @@ object ProducerSettings { val parallelism = config.getInt("parallelism") val dispatcher = config.getString("use-dispatcher") val eosCommitInterval = config.getDuration("eos-commit-interval").asScala + val transactionIdPrefix = config.getString("transaction-id-prefix") new ProducerSettings[K, V]( properties, keySerializer, @@ -88,7 +89,8 @@ object ProducerSettings { dispatcher, eosCommitInterval, enrichAsync = None, - producerFactorySync = None + producerFactorySync = None, + transactionIdPrefix = transactionIdPrefix ) } @@ -233,7 +235,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( val dispatcher: String, val eosCommitInterval: FiniteDuration, val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]], - val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] + val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]], + val transactionIdPrefix: String = "" ) { @deprecated( @@ -339,6 +342,9 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] = copy(eosCommitInterval = eosCommitInterval.asScala) + def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] = + copy(transactionIdPrefix = transactionIdPrefix) + /** * Scala API. * A hook to allow for resolving some settings asynchronously. @@ -387,7 +393,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( dispatcher: String = dispatcher, eosCommitInterval: FiniteDuration = eosCommitInterval, enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync, - producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync + producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync, + transactionIdPrefix: String = transactionIdPrefix ): ProducerSettings[K, V] = new ProducerSettings[K, V](properties, keySerializer, @@ -398,7 +405,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( dispatcher, eosCommitInterval, enrichAsync, - producerFactorySync) + producerFactorySync, + transactionIdPrefix) private final val propertiesAllowList = Set( "acks", @@ -436,7 +444,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( s"dispatcher=$dispatcher," + s"eosCommitInterval=${eosCommitInterval.toCoarsest}," + s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," + - s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" + s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" + + s"transactionIdPrefix=$transactionIdPrefix" } /** diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala index 7f1983839..7cd7d73f7 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala @@ -137,7 +137,7 @@ object Transactional { val flow = Flow .fromGraph( - new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](settings, transactionalId) + new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](settings, settings.transactionIdPrefix + transactionalId) ) .mapAsync(settings.parallelism)(identity) From 58a1a65222d0d36a29ae83b094a26124d462bbf0 Mon Sep 17 00:00:00 2001 From: Justin Pihony Date: Thu, 11 Apr 2024 17:41:18 -0400 Subject: [PATCH 2/6] Add docs that I forgot to push --- core/src/main/scala/akka/kafka/ProducerSettings.scala | 4 ++++ docs/src/main/paradox/transactions.md | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index 2e6ec7ab2..15d952b28 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -342,6 +342,10 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] = copy(eosCommitInterval = eosCommitInterval.asScala) + /** + * Java API: + * The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`. + */ def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] = copy(transactionIdPrefix = transactionIdPrefix) diff --git a/docs/src/main/paradox/transactions.md b/docs/src/main/paradox/transactions.md index 31121ef18..1c13d4bec 100644 --- a/docs/src/main/paradox/transactions.md +++ b/docs/src/main/paradox/transactions.md @@ -55,7 +55,7 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow. They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions. -The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). +The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). In addition, you can optionally set `akka.kafka.producer.transaction-id-prefix` to prefix in front of the generated transaction ID should your specifications require this level of control. ## Consume-Transform-Produce Workflow From b7dff10b2389bc66e75176e5165bd6aaf6a69aea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 12 Apr 2024 08:56:45 +0200 Subject: [PATCH 3/6] Add in the right place, avoid defaults params, mima filter --- core/src/main/mima-filters/5.0.0.backwards.excludes | 5 ++++- core/src/main/scala/akka/kafka/ProducerSettings.scala | 2 +- core/src/main/scala/akka/kafka/scaladsl/Transactional.scala | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/mima-filters/5.0.0.backwards.excludes b/core/src/main/mima-filters/5.0.0.backwards.excludes index 8a1982280..c38b69e59 100644 --- a/core/src/main/mima-filters/5.0.0.backwards.excludes +++ b/core/src/main/mima-filters/5.0.0.backwards.excludes @@ -2,4 +2,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext") + +# internal +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ProducerSettings.this") \ No newline at end of file diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index 15d952b28..10b62deac 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -236,7 +236,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( val eosCommitInterval: FiniteDuration, val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]], val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]], - val transactionIdPrefix: String = "" + val transactionIdPrefix: String ) { @deprecated( diff --git a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala index 7cd7d73f7..447faca22 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Transactional.scala @@ -115,12 +115,13 @@ object Transactional { * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that * emits a [[ConsumerMessage.TransactionalMessage]]. * The flow will override producer properties to enable Kafka exactly-once transactional support. + * A UUID is used for transaction id, optionally prefixed through [[ProducerSettings.transactionIdPrefix]] */ @nowarn("msg=deprecated") def flow[K, V]( settings: ProducerSettings[K, V] ): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] = - flow(settings, UUID.randomUUID().toString) + flow(settings, settings.transactionIdPrefix + UUID.randomUUID().toString) /** * Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that @@ -137,7 +138,7 @@ object Transactional { val flow = Flow .fromGraph( - new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](settings, settings.transactionIdPrefix + transactionalId) + new TransactionalProducerStage[K, V, ConsumerMessage.PartitionOffset](settings, transactionalId) ) .mapAsync(settings.parallelism)(identity) From 76258ea1d02bc1d41487884717eb79b8eddb119b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 12 Apr 2024 10:18:52 +0200 Subject: [PATCH 4/6] happy path test with prefix --- .../scala/akka/kafka/ProducerSettings.scala | 1 - .../kafka/scaladsl/TransactionsSpec.scala | 28 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index 10b62deac..a668bcaf7 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -343,7 +343,6 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( copy(eosCommitInterval = eosCommitInterval.asScala) /** - * Java API: * The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`. */ def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] = diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index 83ad0f31f..9a6e2a4d4 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -55,6 +55,34 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa } } + "complete in happy-path scenario with a transaction prefix" in { + assertAllStagesStopped { + val sourceTopic = createTopic(1) + val sinkTopic = createTopic(2) + val group = createGroupId(1) + + Await.result(produce(sourceTopic, 1 to 100), remainingOrDefault) + + val consumerSettings = consumerDefaults.withGroupId(group) + + val control = + transactionalCopyStream(consumerSettings, txProducerDefaults.withTransactionIdPrefix("my-prefix-"), sourceTopic, sinkTopic, 10.seconds) + .toMat(Sink.ignore)(Keep.left) + .run() + + val probeConsumerGroup = createGroupId(2) + + val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic) + + probeConsumer + .request(100) + .expectNextN((1 to 100).map(_.toString)) + + probeConsumer.cancel() + Await.result(control.shutdown(), remainingOrDefault) + } + } + "complete when messages are filtered out" in assertAllStagesStopped { val sourceTopic = createTopic(1) val sinkTopic = createTopic(2) From bd5a7169a0cf1ab569d3bbcee803a592bc183ae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 12 Apr 2024 14:36:00 +0200 Subject: [PATCH 5/6] formatting --- .../test/scala/akka/kafka/scaladsl/TransactionsSpec.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala index 9a6e2a4d4..bbb641537 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala @@ -66,7 +66,11 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa val consumerSettings = consumerDefaults.withGroupId(group) val control = - transactionalCopyStream(consumerSettings, txProducerDefaults.withTransactionIdPrefix("my-prefix-"), sourceTopic, sinkTopic, 10.seconds) + transactionalCopyStream(consumerSettings, + txProducerDefaults.withTransactionIdPrefix("my-prefix-"), + sourceTopic, + sinkTopic, + 10.seconds) .toMat(Sink.ignore)(Keep.left) .run() From ac6c957ea798f821b57bb0b3407e6af370e08d12 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:58:30 +0200 Subject: [PATCH 6/6] Comma in toString --- core/src/main/scala/akka/kafka/ProducerSettings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/akka/kafka/ProducerSettings.scala b/core/src/main/scala/akka/kafka/ProducerSettings.scala index a668bcaf7..f3da07516 100644 --- a/core/src/main/scala/akka/kafka/ProducerSettings.scala +++ b/core/src/main/scala/akka/kafka/ProducerSettings.scala @@ -448,7 +448,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] ( s"eosCommitInterval=${eosCommitInterval.toCoarsest}," + s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," + s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" + - s"transactionIdPrefix=$transactionIdPrefix" + s",transactionIdPrefix=$transactionIdPrefix" } /**