diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index be54bafca2b42..f96d6974a1cd8 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -47,32 +47,31 @@ * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes * in Kafka Streams. * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + *

In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. * - * Before running this example you must create the input topics and the output topic (e.g. via + *

Before running this example you must create the input topics and the output topic (e.g. via * bin/kafka-topics --create ...), and write some data to the input topics (e.g. via * bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic. * - * The inputs for this example are: + *

The inputs for this example are: * - Topic: streams-pageview-input * Key Format: (String) USER_ID * Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID), "timestamp": (long ms TIMESTAMP)} - * + *

* - Topic: streams-userprofile-input * Key Format: (String) USER_ID * Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)} * - * To observe the results, read the output topic (e.g., via bin/kafka-console-consumer) + *

To observe the results, read the output topic (e.g., via bin/kafka-console-consumer) * - Topic: streams-pageviewstats-typed-output * Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region": (String REGION)} * Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)} * - * Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the + *

Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the * generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't need the extra "_t" field. */ -@SuppressWarnings({"WeakerAccess", "unused"}) public class PageViewTypedDemo { /** diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 8fc874488abe5..70b70d5714ffd 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -45,11 +45,11 @@ * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes * in Kafka Streams. * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + *

In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. * - * Before running this example you must create the input topics and the output topic (e.g. via + *

Before running this example you must create the input topics and the output topic (e.g. via * bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. */ diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index 860f2ffa7f9b6..2fbd8370deafb 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -29,10 +29,10 @@ * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to * write data to a sink (output) topic. * - * In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input" + *

In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input" * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". * - * Before running this example you must create the input topic and the output topic (e.g. via + *

Before running this example you must create the input topic and the output topic (e.g. via * bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via * bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. */ @@ -42,8 +42,8 @@ public static void main(final String[] args) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java index 6e40fa03066dc..0f2a727aed0cd 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java @@ -37,26 +37,26 @@ * which ingests temperature value to compute the maximum value in the latest TEMPERATURE_WINDOW_SIZE seconds (which * is 5 seconds) and send a new message if it exceeds the TEMPERATURE_THRESHOLD (which is 20) * - * In this example, the input stream reads from a topic named "iot-temperature", where the values of messages + *

In this example, the input stream reads from a topic named "iot-temperature", where the values of messages * represent temperature values; using a TEMPERATURE_WINDOW_SIZE seconds "tumbling" window, the maximum value is processed and * sent to a topic named "iot-temperature-max" if it exceeds the TEMPERATURE_THRESHOLD. * - * Before running this example you must create the input topic for temperature values in the following way : + *

Before running this example you must create the input topic for temperature values in the following way : * - * bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature + *

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature * - * and at same time create the output topic for filtered values : + *

and at same time create the output topic for filtered values : * - * bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max + *

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max * - * After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic : + *

After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic : * * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iot-temperature-max --from-beginning * - * On the other side, a console producer can be used for sending temperature values (which needs to be integers) + *

On the other side, a console producer can be used for sending temperature values (which needs to be integers) * to "iot-temperature" by typing them on the console : * - * bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature + *

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature * > 10 * > 15 * > 22 @@ -73,8 +73,8 @@ public static void main(final String[] args) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index d290c660bbf54..1664445f0980d 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -62,8 +62,8 @@ static Properties getStreamsConfig(final String[] args) throws IOException { props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); - props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 6204c422bc0a7..594c7d4f20d83 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -109,8 +109,8 @@ public static void main(final String[] args) throws IOException { props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); - props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java index d347bb58681ad..bf532258bbd6c 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java @@ -100,9 +100,6 @@ public void process(final Record record) { } } } - - @Override - public void close() {} }; } @@ -128,8 +125,8 @@ public static void main(final String[] args) throws IOException { props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer"); props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); - props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java index 41b61e34f173a..140ccc73a63bd 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java @@ -54,8 +54,8 @@ public class DeveloperGuideTesting { private TestOutputTopic outputTopic; private KeyValueStore store; - private Serde stringSerde = new Serdes.StringSerde(); - private Serde longSerde = new Serdes.LongSerde(); + private final Serde stringSerde = new Serdes.StringSerde(); + private final Serde longSerde = new Serdes.LongSerde(); @BeforeEach public void setup() { @@ -72,8 +72,8 @@ public void setup() { // setup test driver final Properties props = new Properties(); - props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); testDriver = new TopologyTestDriver(topology, props); // setup test topics @@ -155,7 +155,6 @@ public static class CustomMaxAggregator implements Processor context; private KeyValueStore store; - @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { this.context = context;