Skip to content

Commit

Permalink
feat: Add transaction prefix option (akka#1733)
Browse files Browse the repository at this point in the history
Co-authored-by: Johan Andrén <[email protected]>
Co-authored-by: Enno Runne <[email protected]>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent f7091ff commit 0c02af9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 8 deletions.
5 changes: 4 additions & 1 deletion core/src/main/mima-filters/5.0.0.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext")

# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ProducerSettings.this")
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ akka.kafka.producer {
# for exactly-once-semantics processing.
eos-commit-interval = 100ms

# This can be used to prepend a value to the beginning of a transaction id when using
# the `Transactional.sink` or `Transactional.flow` for exactly-once-semantics processing.
transaction-id-prefix = ""

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
Expand Down
22 changes: 17 additions & 5 deletions core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object ProducerSettings {
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
val transactionIdPrefix = config.getString("transaction-id-prefix")
new ProducerSettings[K, V](
properties,
keySerializer,
Expand All @@ -88,7 +89,8 @@ object ProducerSettings {
dispatcher,
eosCommitInterval,
enrichAsync = None,
producerFactorySync = None
producerFactorySync = None,
transactionIdPrefix = transactionIdPrefix
)
}

Expand Down Expand Up @@ -233,7 +235,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]],
val transactionIdPrefix: String
) {

@deprecated(
Expand Down Expand Up @@ -339,6 +342,12 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] =
copy(eosCommitInterval = eosCommitInterval.asScala)

/**
* The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`.
*/
def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] =
copy(transactionIdPrefix = transactionIdPrefix)

/**
* Scala API.
* A hook to allow for resolving some settings asynchronously.
Expand Down Expand Up @@ -387,7 +396,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync,
transactionIdPrefix: String = transactionIdPrefix
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
Expand All @@ -398,7 +408,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
dispatcher,
eosCommitInterval,
enrichAsync,
producerFactorySync)
producerFactorySync,
transactionIdPrefix)

private final val propertiesAllowList = Set(
"acks",
Expand Down Expand Up @@ -436,7 +447,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
s"dispatcher=$dispatcher," +
s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" +
s",transactionIdPrefix=$transactionIdPrefix"
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/akka/kafka/scaladsl/Transactional.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ object Transactional {
* Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that
* emits a [[ConsumerMessage.TransactionalMessage]].
* The flow will override producer properties to enable Kafka exactly-once transactional support.
* A UUID is used for transaction id, optionally prefixed through [[ProducerSettings.transactionIdPrefix]]
*/
@nowarn("msg=deprecated")
def flow[K, V](
settings: ProducerSettings[K, V]
): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] =
flow(settings, UUID.randomUUID().toString)
flow(settings, settings.transactionIdPrefix + UUID.randomUUID().toString)

/**
* Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA
The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow.

They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions.
The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics).
The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). In addition, you can optionally set `akka.kafka.producer.transaction-id-prefix` to prefix in front of the generated transaction ID should your specifications require this level of control.

## Consume-Transform-Produce Workflow

Expand Down
32 changes: 32 additions & 0 deletions tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,38 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa
}
}

"complete in happy-path scenario with a transaction prefix" in {
assertAllStagesStopped {
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
val group = createGroupId(1)

Await.result(produce(sourceTopic, 1 to 100), remainingOrDefault)

val consumerSettings = consumerDefaults.withGroupId(group)

val control =
transactionalCopyStream(consumerSettings,
txProducerDefaults.withTransactionIdPrefix("my-prefix-"),
sourceTopic,
sinkTopic,
10.seconds)
.toMat(Sink.ignore)(Keep.left)
.run()

val probeConsumerGroup = createGroupId(2)

val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic)

probeConsumer
.request(100)
.expectNextN((1 to 100).map(_.toString))

probeConsumer.cancel()
Await.result(control.shutdown(), remainingOrDefault)
}
}

"complete when messages are filtered out" in assertAllStagesStopped {
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
Expand Down

0 comments on commit 0c02af9

Please sign in to comment.