Skip to content

Commit

Permalink
KAFKA-3066: Demo Examples for Kafka Streams
Browse files Browse the repository at this point in the history
Author: Guozhang Wang <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes apache#797 from guozhangwang/K3066
  • Loading branch information
guozhangwang authored and ewencp committed Jan 22, 2016
1 parent a19729f commit c197113
Show file tree
Hide file tree
Showing 33 changed files with 670 additions and 252 deletions.
25 changes: 25 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ project(':streams') {

dependencies {
compile project(':clients')
compile project(':connect:json') // this dependency should be removed after we unify data API
compile libs.slf4jlog4j
compile libs.rocksDBJni
compile libs.zkclient // this dependency should be removed after KIP-4
Expand Down Expand Up @@ -542,6 +543,30 @@ project(':streams') {
}
}

project(':streams:examples') {
archivesBaseName = "kafka-streams-examples"

dependencies {
compile project(':streams')
compile project(':connect:json') // this dependency should be removed after we unify data API
}

javadoc {
enabled = false
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtime) {
exclude('kafka-streams*')
}
into "$buildDir/dependant-libs-${versions.scala}"
}

jar {
dependsOn 'copyDependantLibs'
}
}

project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"

Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.streams.examples.pageview;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Connect schemas.
*/
public class JsonPOJODeserializer<T> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();

private Class<T> tClass;

/**
* Default constructor needed by Kafka
*/
public JsonPOJODeserializer() {
}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}

@Override
public T deserialize(String topic, byte[] bytes) {
if (bytes == null)
return null;

T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (Exception e) {
throw new SerializationException(e);
}

return data;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.streams.examples.pageview;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();

private Class<T> tClass;

/**
* Default constructor needed by Kafka
*/
public JsonPOJOSerializer() {

}

@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props, boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}

@Override
public byte[] serialize(String topic, T data) {
if (data == null)
return null;

try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}

@Override
public void close() {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.examples.pageview;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;

import java.util.Properties;

public class PageViewTypedJob {

// POJO classes
static public class PageView {
public String user;
public String page;
}

static public class UserProfile {
public String user;
public String region;
}

static public class PageViewByRegion {
public String user;
public String page;
public String region;
}

static public class WindowedPageViewByRegion {
public long windowStart;
public String region;
}

static public class RegionCount {
public long count;
public String region;
}

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class);

KStreamBuilder builder = new KStreamBuilder();

final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final Serializer<Long> longSerializer = new LongSerializer();
final Deserializer<Long> longDeserializer = new LongDeserializer();


KStream<String, PageView> views = builder.stream("streams-pageview-input");

KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record));

KTable<String, UserProfile> users = builder.table("streams-userprofile-input");

KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser
.leftJoin(users, (view, profile) -> {
PageViewByRegion viewByRegion = new PageViewByRegion();
viewByRegion.user = view.user;
viewByRegion.page = view.page;
viewByRegion.region = profile.region;

return viewByRegion;
})
.map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
.aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000),
stringSerializer, longSerializer,
stringDeserializer, longDeserializer)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
@Override
public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) {
WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion();
wViewByRegion.windowStart = key.window().start();
wViewByRegion.region = key.value();

RegionCount rCount = new RegionCount();
rCount.region = key.value();
rCount.count = value;

return new KeyValue<>(wViewByRegion, rCount);
}
});

// write to the result topic
regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());

KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
}
}
Loading

0 comments on commit c197113

Please sign in to comment.