diff --git a/build.sbt b/build.sbt index 9012a6bcc..92424d9a5 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ name := "kafka-streams-scala" organization := "com.lightbend" -version := "0.2.0" +version := "0.2.1" scalaVersion := Versions.Scala_2_12_Version diff --git a/project/Versions.scala b/project/Versions.scala index 042d24ad0..4e56ded62 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -7,7 +7,7 @@ object Versions { val CuratorVersion = "4.0.0" val MinitestVersion = "2.0.0" val JDKVersion = "1.8" - val Scala_2_12_Version = "2.12.4" + val Scala_2_12_Version = "2.12.5" val Scala_2_11_Version = "2.11.11" val Avro4sVersion = "1.8.3" val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version ) diff --git a/src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala b/src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala index 15913b830..615b336e7 100644 --- a/src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala +++ b/src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala @@ -137,7 +137,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) { def join[VT, VR](table: KTableS[K, VT], joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] = - inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined) + inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined) def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV], keyValueMapper: (K, V) => GK, @@ -165,7 +165,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) { windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] = inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined) - def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream) + def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream.inner) def peek(action: (K, V) => Unit): KStreamS[K, V] = { inner.peek(action(_,_)) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsMergeTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsMergeTest.scala new file mode 100644 index 000000000..437957f41 --- /dev/null +++ b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsMergeTest.scala @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package com.lightbend.kafka.scala.streams + +import java.util.Properties +import java.util.regex.Pattern + +import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait} +import minitest.TestSuite +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization._ +import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import ImplicitConversions._ +import com.typesafe.scalalogging.LazyLogging + +object KafkaStreamsMergeTest extends TestSuite[KafkaLocalServer] with WordCountMergeTestData with LazyLogging { + + override def setup(): KafkaLocalServer = { + val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) + s.start() + s + } + + override def tearDown(server: KafkaLocalServer): Unit = { + server.stop() + } + + test("should count words") { server => + + server.createTopic(inputTopic1) + server.createTopic(inputTopic2) + server.createTopic(outputTopic) + + // + // Step 1: Configure and start the processor topology. + // + import DefaultSerdes._ + + val streamsConfiguration = new Properties() + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}") + streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup") + + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir) + + val builder = new StreamsBuilderS() + + val textLines1 = builder.stream[String, String](inputTopic1) + val textLines2 = builder.stream[String, String](inputTopic2) + + val textLines = textLines1.merge(textLines2) + + val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS) + + val wordCounts: KTableS[String, Long] = + textLines.flatMapValues(v => pattern.split(v.toLowerCase)) + .groupBy((k, v) => v) + .count() + + wordCounts.toStream.to(outputTopic) + + val streams = new KafkaStreams(builder.build(), streamsConfiguration) + streams.start() + + // + // Step 2: Produce some input data to the input topics. + // + val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName) + val mvals1 = sender.batchWriteValue(inputTopic1, inputValues) + val mvals2 = sender.batchWriteValue(inputTopic2, inputValues) + + // + // Step 3: Verify the application's output data. + // + val listener = MessageListener(brokers, outputTopic, "wordcountgroup", + classOf[StringDeserializer].getName, + classOf[LongDeserializer].getName, + new RecordProcessor + ) + + val l = listener.waitUntilMinKeyValueRecordsReceived(expectedWordCounts.size, 30000) + + assertEquals(l.sortBy(_.key), expectedWordCounts.sortBy(_.key)) + + streams.close() + } + + class RecordProcessor extends RecordProcessorTrait[String, Long] { + override def processRecord(record: ConsumerRecord[String, Long]): Unit = { + // logger.info(s"Get Message $record") + } + } + +} + +trait WordCountMergeTestData { + val inputTopic1 = s"inputTopic1.${scala.util.Random.nextInt(100)}" + val inputTopic2 = s"inputTopic2.${scala.util.Random.nextInt(100)}" + val outputTopic = s"outputTpic.${scala.util.Random.nextInt(100)}" + val brokers = "localhost:9092" + val localStateDir = "local_state_data" + + val inputValues = List( + "Hello Kafka Streams", + "All streams lead to Kafka", + "Join Kafka Summit", + "И теперь пошли русские слова" + ) + + val expectedWordCounts: List[KeyValue[String, Long]] = List( + new KeyValue("hello", 2L), + new KeyValue("all", 2L), + new KeyValue("streams", 4L), + new KeyValue("lead", 2L), + new KeyValue("to", 2L), + new KeyValue("join", 2L), + new KeyValue("kafka", 6L), + new KeyValue("summit", 2L), + new KeyValue("и", 2L), + new KeyValue("теперь", 2L), + new KeyValue("пошли", 2L), + new KeyValue("русские", 2L), + new KeyValue("слова", 2L) + ) +} + +