Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add transaction prefix option #1733

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/mima-filters/5.0.0.backwards.excludes
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.javadsl.Transactional.sinkWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.flowWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.scaladsl.Transactional.sinkWithOffsetContext")

# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.ProducerSettings.this")
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ akka.kafka.producer {
# for exactly-once-semantics processing.
eos-commit-interval = 100ms

# This can be used to prepend a value to the beginning of a transaction id when using
# the `Transactional.sink` or `Transactional.flow` for exactly-once-semantics processing.
transaction-id-prefix = ""

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
Expand Down
22 changes: 17 additions & 5 deletions core/src/main/scala/akka/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object ProducerSettings {
val parallelism = config.getInt("parallelism")
val dispatcher = config.getString("use-dispatcher")
val eosCommitInterval = config.getDuration("eos-commit-interval").asScala
val transactionIdPrefix = config.getString("transaction-id-prefix")
new ProducerSettings[K, V](
properties,
keySerializer,
Expand All @@ -88,7 +89,8 @@ object ProducerSettings {
dispatcher,
eosCommitInterval,
enrichAsync = None,
producerFactorySync = None
producerFactorySync = None,
transactionIdPrefix = transactionIdPrefix
)
}

Expand Down Expand Up @@ -233,7 +235,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
val dispatcher: String,
val eosCommitInterval: FiniteDuration,
val enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]],
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]]
val producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]],
val transactionIdPrefix: String
) {

@deprecated(
Expand Down Expand Up @@ -339,6 +342,12 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
def withEosCommitInterval(eosCommitInterval: java.time.Duration): ProducerSettings[K, V] =
copy(eosCommitInterval = eosCommitInterval.asScala)

/**
* The prefix to append to the generated transaction id when using the `Transactional.sink` or `Transactional.flow`.
*/
def withTransactionIdPrefix(transactionIdPrefix: String): ProducerSettings[K, V] =
copy(transactionIdPrefix = transactionIdPrefix)

/**
* Scala API.
* A hook to allow for resolving some settings asynchronously.
Expand Down Expand Up @@ -387,7 +396,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
dispatcher: String = dispatcher,
eosCommitInterval: FiniteDuration = eosCommitInterval,
enrichAsync: Option[ProducerSettings[K, V] => Future[ProducerSettings[K, V]]] = enrichAsync,
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync
producerFactorySync: Option[ProducerSettings[K, V] => Producer[K, V]] = producerFactorySync,
transactionIdPrefix: String = transactionIdPrefix
): ProducerSettings[K, V] =
new ProducerSettings[K, V](properties,
keySerializer,
Expand All @@ -398,7 +408,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
dispatcher,
eosCommitInterval,
enrichAsync,
producerFactorySync)
producerFactorySync,
transactionIdPrefix)

private final val propertiesAllowList = Set(
"acks",
Expand Down Expand Up @@ -436,7 +447,8 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
s"dispatcher=$dispatcher," +
s"eosCommitInterval=${eosCommitInterval.toCoarsest}," +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}," +
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})"
s"producerFactorySync=${producerFactorySync.map(_ => "is defined").getOrElse("is undefined")})" +
s"transactionIdPrefix=$transactionIdPrefix"
ennru marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/akka/kafka/scaladsl/Transactional.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ object Transactional {
* Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that
* emits a [[ConsumerMessage.TransactionalMessage]].
* The flow will override producer properties to enable Kafka exactly-once transactional support.
* A UUID is used for transaction id, optionally prefixed through [[ProducerSettings.transactionIdPrefix]]
*/
@nowarn("msg=deprecated")
def flow[K, V](
settings: ProducerSettings[K, V]
): Flow[Envelope[K, V, ConsumerMessage.PartitionOffset], Results[K, V, ConsumerMessage.PartitionOffset], NotUsed] =
flow(settings, UUID.randomUUID().toString)
flow(settings, settings.transactionIdPrefix + UUID.randomUUID().toString)

/**
* Publish records to Kafka topics and then continue the flow. The flow can only be used with a [[Transactional.source]] that
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ For more details see [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA
The @apidoc[Transactional.sink](Transactional$) is similar to the @apidoc[Producer.committableSink](Producer$) in that messages will be automatically committed as part of a transaction. The @apidoc[Transactional.flow](Transactional$) or @apidoc[Transactional.sink](Transactional$) are required when connecting a consumer to a producer to achieve a transactional workflow.

They override producer properties `enable.idempotence` to `true` and `max.in.flight.requests.per.connection` to `1` as required by the Kafka producer to enable transactions.
The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics).
The `transaction.timeout.ms` is set to 10s as recommended in [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics). In addition, you can optionally set `akka.kafka.producer.transaction-id-prefix` to prefix in front of the generated transaction ID should your specifications require this level of control.

## Consume-Transform-Produce Workflow

Expand Down
32 changes: 32 additions & 0 deletions tests/src/test/scala/akka/kafka/scaladsl/TransactionsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,38 @@ class TransactionsSpec extends SpecBase with TestcontainersKafkaLike with Transa
}
}

"complete in happy-path scenario with a transaction prefix" in {
assertAllStagesStopped {
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
val group = createGroupId(1)

Await.result(produce(sourceTopic, 1 to 100), remainingOrDefault)

val consumerSettings = consumerDefaults.withGroupId(group)

val control =
transactionalCopyStream(consumerSettings,
txProducerDefaults.withTransactionIdPrefix("my-prefix-"),
sourceTopic,
sinkTopic,
10.seconds)
.toMat(Sink.ignore)(Keep.left)
.run()

val probeConsumerGroup = createGroupId(2)

val probeConsumer = valuesProbeConsumer(probeConsumerSettings(probeConsumerGroup), sinkTopic)

probeConsumer
.request(100)
.expectNextN((1 to 100).map(_.toString))

probeConsumer.cancel()
Await.result(control.shutdown(), remainingOrDefault)
}
}

"complete when messages are filtered out" in assertAllStagesStopped {
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
Expand Down