From f3730154993213e813845268e3707c5bd68d9224 Mon Sep 17 00:00:00 2001 From: Greg Roelofs Date: Fri, 26 Jan 2024 16:18:42 -0800 Subject: [PATCH] Fix build breakage due to spotlessScalaCheck failures on upstream streams-scala code. (Ran './gradlew :spotlessApply' to auto-format; build succeeds now. Fix matches upstream Apache actions.) --- .../streams/scala/ImplicitConversions.scala | 22 ++-- .../apache/kafka/streams/scala/Serdes.scala | 6 +- .../kafka/streams/scala/StreamsBuilder.scala | 18 +-- .../scala/kstream/BranchedKStream.scala | 4 +- .../scala/kstream/CogroupedKStream.scala | 14 ++- .../streams/scala/kstream/Consumed.scala | 10 +- .../kafka/streams/scala/kstream/Joined.scala | 8 +- .../scala/kstream/KGroupedStream.scala | 13 ++- .../streams/scala/kstream/KGroupedTable.scala | 29 ++--- .../kafka/streams/scala/kstream/KStream.scala | 100 ++++++++++------ .../kafka/streams/scala/kstream/KTable.scala | 108 +++++++++++------- .../streams/scala/kstream/Materialized.scala | 15 ++- .../streams/scala/kstream/Produced.scala | 5 +- .../streams/scala/kstream/Repartitioned.scala | 5 +- .../SessionWindowedCogroupedKStream.scala | 8 +- .../kstream/SessionWindowedKStream.scala | 16 +-- .../streams/scala/kstream/StreamJoined.scala | 8 +- .../TimeWindowedCogroupedKStream.scala | 8 +- .../scala/kstream/TimeWindowedKStream.scala | 16 +-- .../streams/scala/serialization/Serdes.scala | 12 +- ...inScalaIntegrationTestImplicitSerdes.scala | 4 +- .../kafka/streams/scala/TopologyTest.scala | 50 ++++---- ...mToTableJoinScalaIntegrationTestBase.scala | 42 ++++--- 23 files changed, 308 insertions(+), 213 deletions(-) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala index ab8f319e525c9..5f7064be14eb0 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala @@ -80,13 +80,17 @@ object ImplicitConversions { implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] = Grouped.`with`[K, V] - implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K], - valueSerde: Serde[V], - otherValueSerde: Serde[VO]): Joined[K, V, VO] = + implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit + keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO] + ): Joined[K, V, VO] = Joined.`with`[K, V, VO] - implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K], - valueSerde: Serde[V]): Materialized[K, V, S] = + implicit def materializedFromSerde[K, V, S <: StateStore](implicit + keySerde: Serde[K], + valueSerde: Serde[V] + ): Materialized[K, V, S] = Materialized.`with`[K, V, S] implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] = @@ -95,8 +99,10 @@ object ImplicitConversions { implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] = Repartitioned.`with`[K, V] - implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K], - valueSerde: Serde[V], - otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] = + implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit + keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO] + ): StreamJoined[K, V, VO] = StreamJoined.`with`[K, V, VO] } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala index a36fef83beb88..2e42090d13deb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala @@ -58,8 +58,10 @@ object Serdes { } ) - def fromFn[T >: Null](serializer: (String, T) => Array[Byte], - deserializer: (String, Array[Byte]) => Option[T]): Serde[T] = + def fromFn[T >: Null]( + serializer: (String, T) => Array[Byte], + deserializer: (String, Array[Byte]) => Option[T] + ): Serde[T] = JSerdes.serdeFrom( new Serializer[T] { override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala index 3eeffc484ba52..9430a511f71a4 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala @@ -120,8 +120,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * @see #table(String) * @see `org.apache.kafka.streams.StreamsBuilder#table` */ - def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])( - implicit consumed: Consumed[K, V] + def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit + consumed: Consumed[K, V] ): KTable[K, V] = new KTable(inner.table[K, V](topic, consumed, materialized)) @@ -146,8 +146,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { * @return a `GlobalKTable` for the specified topic * @see `org.apache.kafka.streams.StreamsBuilder#globalTable` */ - def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])( - implicit consumed: Consumed[K, V] + def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit + consumed: Consumed[K, V] ): GlobalKTable[K, V] = inner.globalTable(topic, consumed, materialized) @@ -177,10 +177,12 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) { "Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.", "2.7.0" ) - def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore], - topic: String, - consumed: Consumed[K, V], - stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ = + def addGlobalStore[K, V]( + storeBuilder: StoreBuilder[_ <: StateStore], + topic: String, + consumed: Consumed[K, V], + stateUpdateSupplier: ProcessorSupplier[K, V] + ): StreamsBuilderJ = inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala index 102c2577902cd..c606c0096898c 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala @@ -110,7 +110,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) { def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch()) private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] = - m.asScala.map { - case (name, kStreamJ) => (name, new KStream(kStreamJ)) + m.asScala.map { case (name, kStreamJ) => + (name, new KStream(kStreamJ)) }.toMap } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala index f4fe9fc5ca852..2bf58ca0e5670 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala @@ -43,8 +43,10 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { * @param aggregator a function that computes a new aggregate result * @return a [[CogroupedKStream]] */ - def cogroup[VIn](groupedStream: KGroupedStream[KIn, VIn], - aggregator: (KIn, VIn, VOut) => VOut): CogroupedKStream[KIn, VOut] = + def cogroup[VIn]( + groupedStream: KGroupedStream[KIn, VIn], + aggregator: (KIn, VIn, VOut) => VOut + ): CogroupedKStream[KIn, VOut] = new CogroupedKStream(inner.cogroup(groupedStream.inner, aggregator.asAggregator)) /** @@ -58,8 +60,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { * (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate` */ - def aggregate(initializer: => VOut)( - implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore] + def aggregate(initializer: => VOut)(implicit + materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore] ): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized)) /** @@ -74,8 +76,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) { * (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate` */ - def aggregate(initializer: => VOut, named: Named)( - implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore] + def aggregate(initializer: => VOut, named: Named)(implicit + materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore] ): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized)) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala index a105ed65541a5..714df97c17595 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala @@ -61,8 +61,9 @@ object Consumed { * @tparam V value type * @return a new instance of [[Consumed]] */ - def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K], - valueSerde: Serde[V]): ConsumedJ[K, V] = + def `with`[K, V]( + timestampExtractor: TimestampExtractor + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde) /** @@ -73,7 +74,8 @@ object Consumed { * @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used * @return a new instance of [[Consumed]] */ - def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K], - valueSerde: Serde[V]): ConsumedJ[K, V] = + def `with`[K, V]( + resetPolicy: Topology.AutoOffsetReset + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] = ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala index b6dbb0575a990..c614e1488f8c5 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala @@ -34,9 +34,11 @@ object Joined { * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used * @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes */ - def `with`[K, V, VO](implicit keySerde: Serde[K], - valueSerde: Serde[V], - otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] = + def `with`[K, V, VO](implicit + keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO] + ): JoinedJ[K, V, VO] = JoinedJ.`with`(keySerde, valueSerde, otherValueSerde) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 44a3e568d859e..60a9c572d16e7 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -111,8 +111,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce` */ - def reduce(reducer: (V, V) => V, - named: Named)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + def reduce(reducer: (V, V) => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArrayKeyValueStore] + ): KTable[K, V] = new KTable(inner.reduce(reducer.asReducer, materialized)) /** @@ -125,8 +126,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate` */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)) @@ -141,8 +142,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate` */ - def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized)) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala index 292155dbff2e8..3d9e052a2f17c 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala @@ -76,8 +76,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce` */ - def reduce(adder: (V, V) => V, - subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + def reduce(adder: (V, V) => V, subtractor: (V, V) => V)(implicit + materialized: Materialized[K, V, ByteArrayKeyValueStore] + ): KTable[K, V] = new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, materialized)) /** @@ -92,8 +93,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce` */ - def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)( - implicit materialized: Materialized[K, V, ByteArrayKeyValueStore] + def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArrayKeyValueStore] ): KTable[K, V] = new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, named, materialized)) @@ -109,8 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate` */ - def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = new KTable( inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized) @@ -129,14 +130,16 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate` */ - def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore] + def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayKeyValueStore] ): KTable[K, VR] = new KTable( - inner.aggregate((() => initializer).asInitializer, - adder.asAggregator, - subtractor.asAggregator, - named, - materialized) + inner.aggregate( + (() => initializer).asInitializer, + adder.asAggregator, + subtractor.asAggregator, + named, + materialized + ) ) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index d097b8590350b..dedb4246aaf02 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -558,8 +558,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], - stateStoreNames: String*): KStream[K1, V1] = + def transform[K1, V1]( + transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], + stateStoreNames: String* + ): KStream[K1, V1] = new KStream(inner.transform(transformerSupplier, stateStoreNames: _*)) /** @@ -578,9 +580,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], - named: Named, - stateStoreNames: String*): KStream[K1, V1] = + def transform[K1, V1]( + transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]], + named: Named, + stateStoreNames: String* + ): KStream[K1, V1] = new KStream(inner.transform(transformerSupplier, named, stateStoreNames: _*)) /** @@ -598,8 +602,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], - stateStoreNames: String*): KStream[K1, V1] = + def flatTransform[K1, V1]( + transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], + stateStoreNames: String* + ): KStream[K1, V1] = new KStream(inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*)) /** @@ -618,9 +624,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains more or less records with new key and value (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transform` */ - def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], - named: Named, - stateStoreNames: String*): KStream[K1, V1] = + def flatTransform[K1, V1]( + transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]], + named: Named, + stateStoreNames: String* + ): KStream[K1, V1] = new KStream(inner.flatTransform(transformerSupplier.asJava, named, stateStoreNames: _*)) /** @@ -638,8 +646,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], - stateStoreNames: String*): KStream[K, VR] = + def flatTransformValues[VR]( + valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)) /** @@ -658,9 +668,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], - named: Named, - stateStoreNames: String*): KStream[K, VR] = + def flatTransformValues[VR]( + valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]], + named: Named, + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*)) /** @@ -678,8 +690,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], - stateStoreNames: String*): KStream[K, VR] = + def flatTransformValues[VR]( + valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*)) /** @@ -698,9 +712,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], - named: Named, - stateStoreNames: String*): KStream[K, VR] = + def flatTransformValues[VR]( + valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]], + named: Named, + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*)) /** @@ -717,8 +733,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR], - stateStoreNames: String*): KStream[K, VR] = + def transformValues[VR]( + valueTransformerSupplier: ValueTransformerSupplier[V, VR], + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)) /** @@ -736,9 +754,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR], - named: Named, - stateStoreNames: String*): KStream[K, VR] = + def transformValues[VR]( + valueTransformerSupplier: ValueTransformerSupplier[V, VR], + named: Named, + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*)) /** @@ -755,8 +775,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], - stateStoreNames: String*): KStream[K, VR] = + def transformValues[VR]( + valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)) /** @@ -774,9 +796,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], - named: Named, - stateStoreNames: String*): KStream[K, VR] = + def transformValues[VR]( + valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], + named: Named, + stateStoreNames: String* + ): KStream[K, VR] = new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*)) /** @@ -792,8 +816,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#process` */ @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.") - def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V], - stateStoreNames: String*): Unit = { + def process( + processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V], + stateStoreNames: String* + ): Unit = { val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier() inner.process(processorSupplierJ, stateStoreNames: _*) } @@ -830,9 +856,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#process` */ @deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.") - def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V], - named: Named, - stateStoreNames: String*): Unit = { + def process( + processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V], + named: Named, + stateStoreNames: String* + ): Unit = { val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier() inner.process(processorSupplierJ, named, stateStoreNames: _*) } @@ -1039,7 +1067,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { */ def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])( keyValueMapper: (K, V) => GK, - joiner: (V, GV) => RV, + joiner: (V, GV) => RV ): KStream[K, RV] = new KStream( inner.join[GK, GV, RV]( diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index 7aba69d50e722..892f39eac76dc 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -86,9 +86,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains only those records that satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filter` */ - def filter(predicate: (K, V) => Boolean, - named: Named, - materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + def filter( + predicate: (K, V) => Boolean, + named: Named, + materialized: Materialized[K, V, ByteArrayKeyValueStore] + ): KTable[K, V] = new KTable(inner.filter(predicate.asPredicate, named, materialized)) /** @@ -138,9 +140,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains only those records that do not satisfy the given predicate * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ - def filterNot(predicate: (K, V) => Boolean, - named: Named, - materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + def filterNot( + predicate: (K, V) => Boolean, + named: Named, + materialized: Materialized[K, V, ByteArrayKeyValueStore] + ): KTable[K, V] = new KTable(inner.filterNot(predicate.asPredicate, named, materialized)) /** @@ -198,9 +202,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ - def mapValues[VR](mapper: V => VR, - named: Named, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + def mapValues[VR]( + mapper: V => VR, + named: Named, + materialized: Materialized[K, VR, ByteArrayKeyValueStore] + ): KTable[K, VR] = new KTable(inner.mapValues[VR](mapper.asValueMapper, named, materialized)) /** @@ -258,9 +264,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KTable#mapValues` */ - def mapValues[VR](mapper: (K, V) => VR, - named: Named, - materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] = + def mapValues[VR]( + mapper: (K, V) => VR, + named: Named, + materialized: Materialized[K, VR, ByteArrayKeyValueStore] + ): KTable[K, VR] = new KTable(inner.mapValues[VR](mapper.asValueMapperWithKey, named, materialized)) /** @@ -337,8 +345,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], - stateStoreNames: String*): KTable[K, VR] = + def transformValues[VR]( + valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], + stateStoreNames: String* + ): KTable[K, VR] = new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*)) /** @@ -364,9 +374,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], - named: Named, - stateStoreNames: String*): KTable[K, VR] = + def transformValues[VR]( + valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], + named: Named, + stateStoreNames: String* + ): KTable[K, VR] = new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, named, stateStoreNames: _*)) /** @@ -389,9 +401,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], - stateStoreNames: String*): KTable[K, VR] = + def transformValues[VR]( + valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], + stateStoreNames: String* + ): KTable[K, VR] = new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*)) /** @@ -415,10 +429,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) * @see `org.apache.kafka.streams.kstream.KStream#transformValues` */ - def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], - named: Named, - stateStoreNames: String*): KTable[K, VR] = + def transformValues[VR]( + valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]], + named: Named, + stateStoreNames: String* + ): KTable[K, VR] = new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, named, stateStoreNames: _*)) /** @@ -619,10 +635,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ - def join[VR, KO, VO](other: KTable[KO, VO], - keyExtractor: Function[V, KO], - joiner: ValueJoiner[V, VO, VR], - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] = + def join[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized)) /** @@ -638,11 +656,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ - def join[VR, KO, VO](other: KTable[KO, VO], - keyExtractor: Function[V, KO], - joiner: ValueJoiner[V, VO, VR], - named: Named, - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] = + def join[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + named: Named, + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized)) /** @@ -657,10 +677,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ - def leftJoin[VR, KO, VO](other: KTable[KO, VO], - keyExtractor: Function[V, KO], - joiner: ValueJoiner[V, VO, VR], - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] = + def leftJoin[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, materialized)) /** @@ -676,11 +698,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @return a [[KTable]] that contains join-records for each key and values computed by the given joiner, * one for each matched record-pair with the same key */ - def leftJoin[VR, KO, VO](other: KTable[KO, VO], - keyExtractor: Function[V, KO], - joiner: ValueJoiner[V, VO, VR], - named: Named, - materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] = + def leftJoin[VR, KO, VO]( + other: KTable[KO, VO], + keyExtractor: Function[V, KO], + joiner: ValueJoiner[V, VO, VR], + named: Named, + materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]] + ): KTable[K, VR] = new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized)) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala index eb126f043ccc8..421ac5afeb3ad 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala @@ -50,8 +50,9 @@ object Materialized { * @param valueSerde the value serde to use. * @return a new [[Materialized]] instance with the given storeName */ - def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K], - valueSerde: Serde[V]): MaterializedJ[K, V, S] = + def as[K, V, S <: StateStore]( + storeName: String + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] = MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde) /** @@ -68,8 +69,9 @@ object Materialized { * @param valueSerde the value serde to use. * @return a new [[Materialized]] instance with the given supplier */ - def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K], - valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] = + def as[K, V]( + supplier: WindowBytesStoreSupplier + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] = MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde) /** @@ -86,8 +88,9 @@ object Materialized { * @param valueSerde the value serde to use. * @return a new [[Materialized]] instance with the given supplier */ - def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde: Serde[K], - valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] = + def as[K, V]( + supplier: SessionBytesStoreSupplier + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] = MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala index 351e0a5b375dd..48f917875867b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala @@ -53,7 +53,8 @@ object Produced { * @see KStream#through(String, Produced) * @see KStream#to(String, Produced) */ - def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K], - valueSerde: Serde[V]): ProducedJ[K, V] = + def `with`[K, V]( + partitioner: StreamPartitioner[K, V] + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] = ProducedJ.`with`(keySerde, valueSerde, partitioner) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala index 06a0f4f4ced0b..5f33efa78aa4f 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala @@ -65,8 +65,9 @@ object Repartitioned { * @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and partitioner * @see KStream#repartition(Repartitioned) */ - def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K], - valueSerde: Serde[V]): RepartitionedJ[K, V] = + def `with`[K, V]( + partitioner: StreamPartitioner[K, V] + )(implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] = RepartitionedJ.`streamPartitioner`(partitioner).withKeySerde(keySerde).withValueSerde(valueSerde) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala index e5c5823b6758d..1b20179d5d38d 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala @@ -40,8 +40,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate` */ - def aggregate(initializer: => V, merger: (K, V, V) => V)( - implicit materialized: Materialized[K, V, ByteArraySessionStore] + def aggregate(initializer: => V, merger: (K, V, V) => V)(implicit + materialized: Materialized[K, V, ByteArraySessionStore] ): KTable[Windowed[K], V] = new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, materialized)) @@ -56,8 +56,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate` */ - def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)( - implicit materialized: Materialized[K, V, ByteArraySessionStore] + def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArraySessionStore] ): KTable[Windowed[K], V] = new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, named, materialized)) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala index 0a20444d68197..3d6e157ecdced 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala @@ -49,8 +49,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArraySessionStore] + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArraySessionStore] ): KTable[Windowed[K], VR] = new KTable( inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) @@ -68,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArraySessionStore] + def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArraySessionStore] ): KTable[Windowed[K], VR] = new KTable( inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, named, materialized) @@ -127,8 +127,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V)( - implicit materialized: Materialized[K, V, ByteArraySessionStore] + def reduce(reducer: (V, V) => V)(implicit + materialized: Materialized[K, V, ByteArraySessionStore] ): KTable[Windowed[K], V] = new KTable(inner.reduce(reducer.asReducer, materialized)) @@ -141,8 +141,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) { * the latest (rolling) aggregate for each key within a window * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V, named: Named)( - implicit materialized: Materialized[K, V, ByteArraySessionStore] + def reduce(reducer: (V, V) => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArraySessionStore] ): KTable[Windowed[K], V] = new KTable(inner.reduce(reducer.asReducer, named, materialized)) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala index 15bda130f21f4..9caad638e4cd7 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala @@ -35,9 +35,11 @@ object StreamJoined { * @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used * @return new [[StreamJoined]] instance with the provided serdes */ - def `with`[K, V, VO](implicit keySerde: Serde[K], - valueSerde: Serde[V], - otherValueSerde: Serde[VO]): StreamJoinedJ[K, V, VO] = + def `with`[K, V, VO](implicit + keySerde: Serde[K], + valueSerde: Serde[V], + otherValueSerde: Serde[VO] + ): StreamJoinedJ[K, V, VO] = StreamJoinedJ.`with`(keySerde, valueSerde, otherValueSerde) /** diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala index e9962a63d81f4..ad24228ecc686 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala @@ -39,8 +39,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream * (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate` */ - def aggregate(initializer: => V)( - implicit materialized: Materialized[K, V, ByteArrayWindowStore] + def aggregate(initializer: => V)(implicit + materialized: Materialized[K, V, ByteArrayWindowStore] ): KTable[Windowed[K], V] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized)) @@ -54,8 +54,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream * (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate` */ - def aggregate(initializer: => V, named: Named)( - implicit materialized: Materialized[K, V, ByteArrayWindowStore] + def aggregate(initializer: => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArrayWindowStore] ): KTable[Windowed[K], V] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized)) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index fdb137c83c247..4fcf227e03723 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -47,8 +47,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayWindowStore] + def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayWindowStore] ): KTable[Windowed[K], VR] = new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized)) @@ -63,8 +63,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate` */ - def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)( - implicit materialized: Materialized[K, VR, ByteArrayWindowStore] + def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit + materialized: Materialized[K, VR, ByteArrayWindowStore] ): KTable[Windowed[K], VR] = new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized)) @@ -120,8 +120,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V)( - implicit materialized: Materialized[K, V, ByteArrayWindowStore] + def reduce(reducer: (V, V) => V)(implicit + materialized: Materialized[K, V, ByteArrayWindowStore] ): KTable[Windowed[K], V] = new KTable(inner.reduce(reducer.asReducer, materialized)) @@ -135,8 +135,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) { * latest (rolling) aggregate for each key * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce` */ - def reduce(reducer: (V, V) => V, named: Named)( - implicit materialized: Materialized[K, V, ByteArrayWindowStore] + def reduce(reducer: (V, V) => V, named: Named)(implicit + materialized: Materialized[K, V, ByteArrayWindowStore] ): KTable[Windowed[K], V] = new KTable(inner.reduce(reducer.asReducer, materialized)) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala index c4e7537f924ff..0c72358c15f3e 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala @@ -57,8 +57,10 @@ object Serdes extends LowPrioritySerdes { } ) - def fromFn[T >: Null](serializer: (String, T) => Array[Byte], - deserializer: (String, Array[Byte]) => Option[T]): Serde[T] = + def fromFn[T >: Null]( + serializer: (String, T) => Array[Byte], + deserializer: (String, Array[Byte]) => Option[T] + ): Serde[T] = JSerdes.serdeFrom( new Serializer[T] { override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data) @@ -75,13 +77,13 @@ object Serdes extends LowPrioritySerdes { trait LowPrioritySerdes { - implicit val nullSerde: Serde[Null] = { + implicit val nullSerde: Serde[Null] = Serdes.fromFn[Null]( { _: Null => null - }, { _: Array[Byte] => + }, + { _: Array[Byte] => None } ) - } } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 56b1c29378e19..e9577bcf73c6b 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -54,7 +54,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val clicksPerRegion: KTable[String, Long] = userClicksStream - // Join the stream against the table. + // Join the stream against the table. .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from -> to -> @@ -98,7 +98,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val clicksPerRegion: KTable[String, Long] = userClicksStream - // Join the stream against the table. + // Join the stream against the table. .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from -> to -> diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index ce75f76984f84..325e26ce0d118 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -287,13 +287,12 @@ class TopologyTest { val textLines = streamBuilder.stream[String, String](inputTopic) val _: KTable[String, Long] = textLines - .transform( - () => - new Transformer[String, String, KeyValue[String, String]] { - override def init(context: ProcessorContext): Unit = () - override def transform(key: String, value: String): KeyValue[String, String] = - new KeyValue(key, value.toLowerCase) - override def close(): Unit = () + .transform(() => + new Transformer[String, String, KeyValue[String, String]] { + override def init(context: ProcessorContext): Unit = () + override def transform(key: String, value: String): KeyValue[String, String] = + new KeyValue(key, value.toLowerCase) + override def close(): Unit = () } ) .groupBy((_, v) => v) @@ -308,13 +307,12 @@ class TopologyTest { val streamBuilder = new StreamsBuilderJ val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic) - val lowered: KStreamJ[String, String] = textLines.transform( - () => - new Transformer[String, String, KeyValue[String, String]] { - override def init(context: ProcessorContext): Unit = () - override def transform(key: String, value: String): KeyValue[String, String] = - new KeyValue(key, value.toLowerCase) - override def close(): Unit = () + val lowered: KStreamJ[String, String] = textLines.transform(() => + new Transformer[String, String, KeyValue[String, String]] { + override def init(context: ProcessorContext): Unit = () + override def transform(key: String, value: String): KeyValue[String, String] = + new KeyValue(key, value.toLowerCase) + override def close(): Unit = () } ) @@ -378,16 +376,20 @@ class TopologyTest { mappedStream .filter((k: String, _: String) => k == "A") - .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString, - JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))( + .join(stream2)( + (v1: String, v2: Int) => v1 + ":" + v2.toString, + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)) + )( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde) ) .to(JOINED_TOPIC) mappedStream .filter((k: String, _: String) => k == "A") - .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString, - JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))( + .join(stream3)( + (v1: String, v2: String) => v1 + ":" + v2.toString, + JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)) + )( StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde) ) .to(JOINED_TOPIC) @@ -457,10 +459,14 @@ class TopologyTest { builder } - assertNotEquals(getTopologyScala.build(props).describe.toString, - getTopologyScala.build(propsNoOptimization).describe.toString) - assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, - getTopologyJava.build(propsNoOptimization).describe.toString) + assertNotEquals( + getTopologyScala.build(props).describe.toString, + getTopologyScala.build(propsNoOptimization).describe.toString + ) + assertEquals( + getTopologyScala.build(propsNoOptimization).describe.toString, + getTopologyJava.build(propsNoOptimization).describe.toString + ) assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString) } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala index 5b2aa7626825d..984cb74a6e2fd 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala @@ -100,36 +100,44 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat p } - def produceNConsume(userClicksTopic: String, - userRegionsTopic: String, - outputTopic: String, - waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String, Long]] = { + def produceNConsume( + userClicksTopic: String, + userRegionsTopic: String, + outputTopic: String, + waitTillRecordsReceived: Boolean = true + ): java.util.List[KeyValue[String, Long]] = { import _root_.scala.jdk.CollectionConverters._ // Publish user-region information. val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, - userRegions.asJava, - userRegionsProducerConfig, - mockTime, - false) + IntegrationTestUtils.produceKeyValuesSynchronously( + userRegionsTopic, + userRegions.asJava, + userRegionsProducerConfig, + mockTime, + false + ) // Publish user-click information. val userClicksProducerConfig: Properties = getUserClicksProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, - userClicks.asJava, - userClicksProducerConfig, - mockTime, - false) + IntegrationTestUtils.produceKeyValuesSynchronously( + userClicksTopic, + userClicks.asJava, + userClicksProducerConfig, + mockTime, + false + ) if (waitTillRecordsReceived) { // consume and verify result val consumerConfig = getConsumerConfig() - IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, - outputTopic, - expectedClicksPerRegion.asJava) + IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived( + consumerConfig, + outputTopic, + expectedClicksPerRegion.asJava + ) } else { java.util.Collections.emptyList() }