Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: cleanup some compiler warnings in Kafka Streams examples #14547

Merged
merged 1 commit into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,31 @@
* using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes
* in Kafka Streams.
*
* In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* <p>In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
* is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
*
* Before running this example you must create the input topics and the output topic (e.g. via
* <p>Before running this example you must create the input topics and the output topic (e.g. via
* bin/kafka-topics --create ...), and write some data to the input topics (e.g. via
* bin/kafka-console-producer). Otherwise you won't see any data arriving in the output topic.
*
* The inputs for this example are:
* <p>The inputs for this example are:
* - Topic: streams-pageview-input
* Key Format: (String) USER_ID
* Value Format: (JSON) {"_t": "pv", "user": (String USER_ID), "page": (String PAGE_ID), "timestamp": (long ms TIMESTAMP)}
*
* <p>
* - Topic: streams-userprofile-input
* Key Format: (String) USER_ID
* Value Format: (JSON) {"_t": "up", "region": (String REGION), "timestamp": (long ms TIMESTAMP)}
*
* To observe the results, read the output topic (e.g., via bin/kafka-console-consumer)
* <p>To observe the results, read the output topic (e.g., via bin/kafka-console-consumer)
* - Topic: streams-pageviewstats-typed-output
* Key Format: (JSON) {"_t": "wpvbr", "windowStart": (long ms WINDOW_TIMESTAMP), "region": (String REGION)}
* Value Format: (JSON) {"_t": "rc", "count": (long REGION_COUNT), "region": (String REGION)}
*
* Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the
* <p>Note, the "_t" field is necessary to help Jackson identify the correct class for deserialization in the
* generic {@link JSONSerde}. If you instead specify a specific serde per class, you won't need the extra "_t" field.
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public class PageViewTypedDemo {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
* using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes
* in Kafka Streams.
*
* In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* <p>In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input"
* with a user profile table that reads from a topic named "streams-userprofile-input", where the data format
* is JSON string representing a record in the stream or table, to compute the number of pageviews per user region.
*
* Before running this example you must create the input topics and the output topic (e.g. via
* <p>Before running this example you must create the input topics and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topics (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
* Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to
* write data to a sink (output) topic.
*
* In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input"
* <p>In this example, we implement a simple "pipe" program that reads from a source topic "streams-plaintext-input"
* and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output".
*
* Before running this example you must create the input topic and the output topic (e.g. via
* <p>Before running this example you must create the input topic and the output topic (e.g. via
* bin/kafka-topics.sh --create ...), and write some data to the input topic (e.g. via
* bin/kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic.
*/
Expand All @@ -42,8 +42,8 @@ public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,26 @@
* which ingests temperature value to compute the maximum value in the latest TEMPERATURE_WINDOW_SIZE seconds (which
* is 5 seconds) and send a new message if it exceeds the TEMPERATURE_THRESHOLD (which is 20)
*
* In this example, the input stream reads from a topic named "iot-temperature", where the values of messages
* <p>In this example, the input stream reads from a topic named "iot-temperature", where the values of messages
* represent temperature values; using a TEMPERATURE_WINDOW_SIZE seconds "tumbling" window, the maximum value is processed and
* sent to a topic named "iot-temperature-max" if it exceeds the TEMPERATURE_THRESHOLD.
*
* Before running this example you must create the input topic for temperature values in the following way :
* <p>Before running this example you must create the input topic for temperature values in the following way :
*
* bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature
* <p>bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature
*
* and at same time create the output topic for filtered values :
* <p>and at same time create the output topic for filtered values :
*
* bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max
* <p>bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iot-temperature-max
*
* After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic :
* <p>After that, a console consumer can be started in order to read filtered values from the "iot-temperature-max" topic :
*
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iot-temperature-max --from-beginning
*
* On the other side, a console producer can be used for sending temperature values (which needs to be integers)
* <p>On the other side, a console producer can be used for sending temperature values (which needs to be integers)
* to "iot-temperature" by typing them on the console :
*
* bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature
* <p>bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic iot-temperature
* > 10
* > 15
* > 22
Expand All @@ -73,8 +73,8 @@ public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-temperature");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ static Properties getStreamsConfig(final String[] args) throws IOException {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public static void main(final String[] args) throws IOException {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ public void process(final Record<String, String> record) {
}
}
}

@Override
public void close() {}
};
}

Expand All @@ -128,8 +125,8 @@ public static void main(final String[] args) throws IOException {
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-transformer");
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class DeveloperGuideTesting {
private TestOutputTopic<String, Long> outputTopic;
private KeyValueStore<String, Long> store;

private Serde<String> stringSerde = new Serdes.StringSerde();
private Serde<Long> longSerde = new Serdes.LongSerde();
private final Serde<String> stringSerde = new Serdes.StringSerde();
private final Serde<Long> longSerde = new Serdes.LongSerde();

@BeforeEach
public void setup() {
Expand All @@ -72,8 +72,8 @@ public void setup() {

// setup test driver
final Properties props = new Properties();
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
testDriver = new TopologyTestDriver(topology, props);

// setup test topics
Expand Down Expand Up @@ -155,7 +155,6 @@ public static class CustomMaxAggregator implements Processor<String, Long, Strin
ProcessorContext<String, Long> context;
private KeyValueStore<String, Long> store;

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<String, Long> context) {
this.context = context;
Expand Down