Skip to content

Commit

Permalink
Finished exercises
Browse files Browse the repository at this point in the history
  • Loading branch information
oleksandrivan committed Sep 28, 2023
1 parent c8b4375 commit 7e9214d
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 40 deletions.
37 changes: 37 additions & 0 deletions kafka-streams/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '2'
services:
broker:
image: confluentinc/cp-kafka:7.4.1
hostname: broker
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ public static void main(String[] args) throws IOException {

// Now take the electronicStream object, group by key and perform an aggregation
// Don't forget to convert the KTable returned by the aggregate call back to a KStream using the toStream method
electronicStream.groupByKey().aggregate(null, null);
electronicStream.groupByKey().aggregate(() -> 0.0,
(key, value, aggregate) -> aggregate + value.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double())
).toStream()
.peek((key, value) -> System.out.println("Outgoing record - key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

// To view the results of the aggregation consider
// right after the toStream() method .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public static class StreamsDeserializationErrorHandler implements Deserializatio
public DeserializationHandlerResponse handle(ProcessorContext context,
ConsumerRecord<byte[], byte[]> record,
Exception exception) {
if(++errorCounter > 25) {
return DeserializationHandlerResponse.FAIL;
}
// This return null statement is here so the code will compile
// You need to replace it with some logic described below
return null;
return DeserializationHandlerResponse.CONTINUE;
// If the number of errors remain under 25 continue processing
// Otherwise fail
// Note in both cases you'll return a DeserializationHandlerResponse ENUM
Expand All @@ -55,7 +58,10 @@ public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]>
Exception exception) {
// This return null statement is here so the code will compile
// You need to replace it with some logic described below
return null;
if(exception instanceof RecordTooLargeException){
return ProductionExceptionHandlerResponse.CONTINUE;
}
return ProductionExceptionHandlerResponse.FAIL;
// If the exception type is RecordTooLargeException continue working
// Otherwise fail
// Note in both cases you'll return a ProductionExceptionHandlerResponse ENUM
Expand All @@ -71,7 +77,10 @@ public static class StreamsCustomUncaughtExceptionHandler implements StreamsUnca
public StreamThreadExceptionResponse handle(Throwable exception) {
// This return null statement is here so the code will compile
// You need to replace it with some logic described below
return null;
if(exception instanceof StreamsException && exception.getCause().getMessage().equals("Retryable transient error")){
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;

// Check if the exception is a StreamsException
// If it is - get the underlying Throwable HINT: exception.getCause()
Expand All @@ -91,8 +100,8 @@ public static void main(String[] args) throws IOException {
// HINT: look in StreamsConfig for Deserialization and Production to get the correct
// static string configuration names

streamsProps.put("????", null);
streamsProps.put("???", null);
streamsProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsDeserializationErrorHandler.class);
streamsProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamsRecordProducerErrorHandler.class);

StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("error.input.topic");
Expand All @@ -116,6 +125,7 @@ public static void main(String[] args) throws IOException {
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps)) {
kafkaStreams.setUncaughtExceptionHandler(new StreamsCustomUncaughtExceptionHandler());
final CountDownLatch shutdownLatch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,16 @@ public static void main(String[] args) throws IOException {
KTable<String, User> userTable =
builder.table(tableInput, Materialized.with(Serdes.String(), userSerde));

KStream<String, CombinedOrder> combinedStream = null;
// create a Join between the applianceStream and the electronicStream
// using the ValueJoiner created above, orderJoiner gets you the correct value type of CombinedOrder
// You want to join records within 30 minutes of each other HINT: JoinWindows and Duration.ofMinutes
// Add the correct Serdes for the join state stores remember both sides have same key type
// HINT: StreamJoined and Serdes.String and Serdes for the applianceStream and electronicStream created above

// Optionally add this statement after the join to see the results on the console
// .peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));


// Now join the combinedStream with the userTable,
// but you'll always want a result even if no corresponding entry is found in the table
// Using the ValueJoiner created above, enrichmentJoiner, return a CombinedOrder instance enriched with user information
// You'll need to add a Joined instance with the correct Serdes for the join state store

// Add these two statements after the join call to print results to the console and write results out
// to a topic

// .peek((key, value) -> System.out.println("Stream-Table Join record key " + key + " value " + value))
// .to(outputTopic, Produced.with(Serdes.String(), combinedSerde));
KStream<String, CombinedOrder> combinedStream = applianceStream.join(
electronicStream,
orderJoiner,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30)),
StreamJoined.with(Serdes.String(), applianceSerde, electronicSerde)
).peek((key, value) -> System.out.println("Stream-Stream Join record key " + key + " value " + value));

combinedStream.leftJoin(userTable, enrichmentJoiner, Joined.with(Serdes.String(), combinedSerde, userSerde))
.peek((key, value) -> System.out.println("Stream-Table Join record key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), combinedSerde));

try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps)) {
final CountDownLatch shutdownLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,23 @@ public void init(ProcessorContext<String, Double> context) {
// Save reference of the context
// Retrieve the store and save a reference
// Schedule a punctuation HINT: use context.schedule and the method you want to call is forwardAll
this.context = context;
this.store = context.getStateStore(storeName);

context.schedule(Duration.ofSeconds(10), PunctuationType.STREAM_TIME, this::forwardAll);
}

private void forwardAll(final long timestamp) {
// Get a KeyValueIterator HINT there's a method on the KeyValueStore
// Don't forget to close the iterator! HINT use try-with resources
// Iterate over the records and create a Record instance and forward downstream HINT use a method on the ProcessorContext to forward
try (var iterator = store.all()){
while (iterator.hasNext()){
var keyValue = iterator.next();
context.forward(new Record<>(keyValue.key, keyValue.value, timestamp));
System.out.println("Punctuation forwarded record - key " + keyValue.key + " value " + keyValue.value);
}
}
}

@Override
Expand All @@ -62,6 +73,14 @@ public void process(Record<String, ElectronicOrder> record) {
// Don't forget to check for null
// Add the price from the value to the current total from store and put it in the store
// HINT state stores are key-value stores
var key = record.key();
var actualValue = store.get(key);
if(actualValue == null) {
store.put(key, 0.0);
} else {
store.put(key, actualValue + record.value().getPrice());
}
System.out.println("Processed incoming record - key " + key + " value " + record.value());
}
};
}
Expand Down Expand Up @@ -92,6 +111,13 @@ public static void main(String[] args) throws IOException {

final Topology topology = new Topology();

String sourceName = "source-node";
topology.addSource(sourceName, stringSerde.deserializer() , electronicSerde.deserializer(), inputTopic);
String processorName = "processor-node";
topology.addProcessor(processorName, new TotalPriceOrderProcessorSupplier(storeName), sourceName);
topology.addSink("sink-name", outputTopic, stringSerde.serializer(), doubleSerde.serializer(), processorName);


// Add a source node to the topology HINT: topology.addSource
// Give it a name, add deserializers for the key and the value and provide the input topic name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ static class OrderTimestampExtractor implements TimestampExtractor {
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
// Extract the timestamp from the value in the record
// and return that instead
return -1L;

final ElectronicOrder order = (ElectronicOrder) record.value();
return order.getTime();
}
}

Expand All @@ -49,7 +49,7 @@ public static void main(String[] args) throws IOException {

final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic,
Consumed.with(Serdes.String(), electronicSerde))
Consumed.with(Serdes.String(), electronicSerde).withTimestampExtractor(new OrderTimestampExtractor()))
//Wire up the timestamp extractor HINT do it on the Consumed object vs configs
.peek((key, value) -> System.out.println("Incoming record - key " + key + " value " + value));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.*;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -44,14 +40,17 @@ public static void main(String[] args) throws IOException {

electronicStream.groupByKey()
// Window the aggregation by the hour and allow for records to be up 5 minutes late
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofMinutes(5)))
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
// Don't emit results until the window closes HINT suppression
.suppress(untilWindowCloses(unbounded()))
.toStream()
// When windowing Kafka Streams wraps the key in a Windowed class
// After converting the table to a stream it's a good idea to extract the
// Underlying key from the Windowed instance HINT: use map
// Underlying key from the Windowed instance HINT: use map
.map((key, value) -> KeyValue.pair(key.key(), value) )
.peek((key, value) -> System.out.println("Outgoing record - key " + key + " value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@ public void shouldAggregateRecords() {
// You'll need a Topology and properties HINT: StreamBuilder.build() and streamsProps
// You always want to use a TopologyTestDriver in a try-with-resources block to make sure
// gets closed properly which will ensure any local state is cleaned up
try (final TopologyTestDriver testDriver = null) {
try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsProps)) {
// Complete the TestInputTopic HINT: it needs a topic name and serializers for the key and value
final TestInputTopic<String, ElectronicOrder> inputTopic = null;
final TestInputTopic<String, ElectronicOrder> inputTopic = testDriver
.createInputTopic(inputTopicName, stringSerde.serializer(), electronicSerde.serializer());

// Complete the TestOutputTopic HINT: it needs a topic name and deserializers for the key and value
final TestOutputTopic<String, Double> outputTopic = null;
final TestOutputTopic<String, Double> outputTopic = testDriver.createOutputTopic(outputTopicName, stringSerde.deserializer(), doubleSerde.deserializer());

// Mock records for the test
final List<ElectronicOrder> orders = new ArrayList<>();
Expand All @@ -74,11 +75,13 @@ public void shouldAggregateRecords() {

// Run the mock records through the topology HINT use the inputTopic above
// and pipe each record through make sure to use the key of the order
orders.forEach(order -> inputTopic.pipeInput(order.getElectronicId(), order));

// Read the values from the topology HINT use the outputTopic to read all values as list
List<Double> actualValues = null;
List<Double> actualValues = outputTopic.readValuesToList();
// assert the actualValues return matches the expected values
// HINT assertEquals(expected, actual);
assertEquals(expectedValues, actualValues);
}

}
Expand Down

0 comments on commit 7e9214d

Please sign in to comment.