-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathWordCountLambdaIntegrationTest.java
154 lines (133 loc) · 6.56 KB
/
WordCountLambdaIntegrationTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster;
import static org.assertj.core.api.Assertions.assertThat;
/**
* End-to-end integration test based on {@link WordCountLambdaExample}, using an embedded Kafka
* cluster.
*
* See {@link WordCountLambdaExample} for further documentation.
*
* See {@link WordCountScalaIntegrationTest} for the equivalent Scala example.
*
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class WordCountLambdaIntegrationTest {
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
private static final String inputTopic = "inputTopic";
private static final String outputTopic = "outputTopic";
@BeforeClass
public static void startKafkaCluster() throws Exception {
CLUSTER.createTopic(inputTopic);
CLUSTER.createTopic(outputTopic);
}
@Test
public void shouldCountWords() throws Exception {
List<String> inputValues = Arrays.asList(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit"
);
List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList(
new KeyValue<>("hello", 1L),
new KeyValue<>("kafka", 1L),
new KeyValue<>("streams", 1L),
new KeyValue<>("all", 1L),
new KeyValue<>("streams", 2L),
new KeyValue<>("lead", 1L),
new KeyValue<>("to", 1L),
new KeyValue<>("kafka", 2L),
new KeyValue<>("join", 1L),
new KeyValue<>("kafka", 3L),
new KeyValue<>("summit", 1L)
);
//
// Step 1: Configure and start the processor topology.
//
// This code is identical to what's shown in WordCountLambdaExample, with the exception of
// the call to `purgeLocalStreamsState()`, which we need only for cleaning up previous runs of
// this integration test.
//
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Explicitly place the state directory under /tmp so that we can remove it via
// `purgeLocalStreamsState` below. Once Streams is updated to expose the effective
// StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
// with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
// accordingly.
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(inputTopic);
KStream<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey(stringSerde, "Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, outputTopic);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig);
//
// Step 3: Verify the application's output data.
//
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-lambda-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedWordCounts.size());
streams.close();
assertThat(actualWordCounts).containsExactlyElementsOf(expectedWordCounts);
}
}