Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
1. Fixes #63
Browse files Browse the repository at this point in the history
2. Fixes #64
3. Updated Scala version to 2.12.5 and project version to 0.2.1
  • Loading branch information
debasishg committed Apr 3, 2018
1 parent 2bb39d1 commit 41a0f90
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name := "kafka-streams-scala"

organization := "com.lightbend"

version := "0.2.0"
version := "0.2.1"

scalaVersion := Versions.Scala_2_12_Version

Expand Down
2 changes: 1 addition & 1 deletion project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object Versions {
val CuratorVersion = "4.0.0"
val MinitestVersion = "2.0.0"
val JDKVersion = "1.8"
val Scala_2_12_Version = "2.12.4"
val Scala_2_12_Version = "2.12.5"
val Scala_2_11_Version = "2.11.11"
val Avro4sVersion = "1.8.3"
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {

def join[VT, VR](table: KTableS[K, VT],
joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStreamS[K, VR] =
inner.leftJoin[VT, VR](table.inner, joiner.asValueJoiner, joined)
inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)

def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV],
keyValueMapper: (K, V) => GK,
Expand Down Expand Up @@ -165,7 +165,7 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStreamS[K, VR] =
inner.outerJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)

def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream)
def merge(stream: KStreamS[K, V]): KStreamS[K, V] = inner.merge(stream.inner)

def peek(action: (K, V) => Unit): KStreamS[K, V] = {
inner.peek(action(_,_))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafka.scala.streams

import java.util.Properties
import java.util.regex.Pattern

import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, MessageSender, RecordProcessorTrait}
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import ImplicitConversions._
import com.typesafe.scalalogging.LazyLogging

object KafkaStreamsMergeTest extends TestSuite[KafkaLocalServer] with WordCountMergeTestData with LazyLogging {

override def setup(): KafkaLocalServer = {
val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir))
s.start()
s
}

override def tearDown(server: KafkaLocalServer): Unit = {
server.stop()
}

test("should count words") { server =>

server.createTopic(inputTopic1)
server.createTopic(inputTopic2)
server.createTopic(outputTopic)

//
// Step 1: Configure and start the processor topology.
//
import DefaultSerdes._

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcountgroup")

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, localStateDir)

val builder = new StreamsBuilderS()

val textLines1 = builder.stream[String, String](inputTopic1)
val textLines2 = builder.stream[String, String](inputTopic2)

val textLines = textLines1.merge(textLines2)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((k, v) => v)
.count()

wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()

//
// Step 2: Produce some input data to the input topics.
//
val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName)
val mvals1 = sender.batchWriteValue(inputTopic1, inputValues)
val mvals2 = sender.batchWriteValue(inputTopic2, inputValues)

//
// Step 3: Verify the application's output data.
//
val listener = MessageListener(brokers, outputTopic, "wordcountgroup",
classOf[StringDeserializer].getName,
classOf[LongDeserializer].getName,
new RecordProcessor
)

val l = listener.waitUntilMinKeyValueRecordsReceived(expectedWordCounts.size, 30000)

assertEquals(l.sortBy(_.key), expectedWordCounts.sortBy(_.key))

streams.close()
}

class RecordProcessor extends RecordProcessorTrait[String, Long] {
override def processRecord(record: ConsumerRecord[String, Long]): Unit = {
// logger.info(s"Get Message $record")
}
}

}

trait WordCountMergeTestData {
val inputTopic1 = s"inputTopic1.${scala.util.Random.nextInt(100)}"
val inputTopic2 = s"inputTopic2.${scala.util.Random.nextInt(100)}"
val outputTopic = s"outputTpic.${scala.util.Random.nextInt(100)}"
val brokers = "localhost:9092"
val localStateDir = "local_state_data"

val inputValues = List(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit",
"И теперь пошли русские слова"
)

val expectedWordCounts: List[KeyValue[String, Long]] = List(
new KeyValue("hello", 2L),
new KeyValue("all", 2L),
new KeyValue("streams", 4L),
new KeyValue("lead", 2L),
new KeyValue("to", 2L),
new KeyValue("join", 2L),
new KeyValue("kafka", 6L),
new KeyValue("summit", 2L),
new KeyValue("и", 2L),
new KeyValue("теперь", 2L),
new KeyValue("пошли", 2L),
new KeyValue("русские", 2L),
new KeyValue("слова", 2L)
)
}


0 comments on commit 41a0f90

Please sign in to comment.