diff --git a/docs/concepts/3_replay.md b/docs/concepts/3_replay.md index 0f134ec..642af91 100644 --- a/docs/concepts/3_replay.md +++ b/docs/concepts/3_replay.md @@ -1,10 +1,10 @@ # Replay -This section explains how you can run a dag using historical data, typically stored in files or databases. +This section explains how to run a beavers application using historical data, typically stored in files or databases. ## Manual Replay -Starting with a simple data with one source going to one sink: +Starting with a simple dag with one source going to one sink: ```python --8<-- "examples/replay_concepts.py:simple_dag" diff --git a/docs/concepts/4_kafka.md b/docs/concepts/4_kafka.md new file mode 100644 index 0000000..5e3ced1 --- /dev/null +++ b/docs/concepts/4_kafka.md @@ -0,0 +1,72 @@ +# Live with Kafka + +This section explains how to a beavers application in real time using kafka. + +## Count Word Example + +Starting with a simple "count word" dag with one source going to one sink: + +```python +--8<-- "examples/kafka_concepts.py:dag" +``` + +This dag has got a source node called `words` and a sink node called `counts` + +## Defining Kafka Source + +We will be receiving data from kafka, on a topic called `words`. + +First we need to define how we deserialize messages coming from kafka: + +```python +--8<-- "examples/kafka_concepts.py:deserializer" +``` + +Then, we put together the `SourceTopic` with its: + +- topic (`words`) +- deserializer (`deserialize_messages`) +- replay policy (`from_latest`) + +```python +--8<-- "examples/kafka_concepts.py:kafka_source" +``` + +There are multiple kafka replay policy available, see the api doc for the full list. + +## Defining Kafka Sink + +We will be sending the results to the `counts` topic. +The key will be the word.T The value will be the latest count. + +First we need to define a serializer, which converts each count to a `KafkaProducerMessage` + +```python +--8<-- "examples/kafka_concepts.py:serializer" +``` + +The serializer is responsible for providing the topic for each outgoing message. + +## Putting it together with KafkaDriver + +The `KafkaDriver` takes care of creating the kafka producer and consumer, and passing the message through: + +```python +--8<-- "examples/kafka_concepts.py:kafka_driver" +``` + +## Beavers Kafka Features + +- One consumer: There is only one consumer (rather than one consumer for each topic) +- One producer: There is only one producer (rather than one producer for each topic) +- When polling messages, beavers tries to read all available messages, up to a limit of `batch_size=5000` (which is configurable in the KafkaDriver) +- When replaying past data, beavers orchestrate topic/partition so data is replayed in order, across topics, based on each message timestamp. +- When replaying past data, some newer messages have to be held. + To avoid memory issue, the number of held messages is capped to `batch_size*5`. + Once the number of held messages get to high, partitions that are ahead of the watermark are paused. + These partitions are un-paused once the application catches up + + +## Beavers Kafka Limitations + +- One beavers application consumes every partition for requested topics (no load balancing/scaling) diff --git a/examples/kafka_concepts.py b/examples/kafka_concepts.py new file mode 100644 index 0000000..4713b86 --- /dev/null +++ b/examples/kafka_concepts.py @@ -0,0 +1,89 @@ +# ruff: noqa: E402 +# isort: skip_file + + +import confluent_kafka + +# --8<-- [start:dag] +from beavers import Dag + + +class CountWords: + state = {} + + def __call__(self, new_words: list[str]) -> dict[str, int]: + for word in new_words: + self.state[word] = self.state.get(word, 0) + 1 + return self.state + + +def update_stream( + state: dict[str, int], updated_words: list[str] +) -> list[tuple[str, int]]: + return [(word, state[word]) for word in set(updated_words)] + + +dag = Dag() +word_source = dag.source_stream(name="words") +count_state = dag.state(CountWords()).map(word_source) +count_stream = dag.stream(update_stream, []).map(count_state, word_source) +dag.sink("counts", count_stream) +# --8<-- [end:dag] + + +# --8<-- [start:deserializer] +def deserialize_messages(messages: list[confluent_kafka.Message]) -> list[str]: + return [message.value() for message in messages] + + +# --8<-- [end:deserializer] + +# --8<-- [start:kafka_source] +from beavers.kafka import SourceTopic, KafkaDriver + +source_topic = SourceTopic.from_latest("words", deserialize_messages) +# --8<-- [end:kafka_source] + + +# --8<-- [start:serializer] +from beavers.kafka import KafkaProducerMessage + + +def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage]: + return [ + KafkaProducerMessage( + topic="counts", + key=word, + value=str(count), + ) + for word, count in values + ] + + +# --8<-- [end:serializer] + + +# --8<-- [start:kafka_driver] +kafka_driver = KafkaDriver.create( + dag=dag, + consumer_config={ + "enable.partition.eof": True, + "group.id": "beavers", + "bootstrap.servers": "localhost:9092", + }, + producer_config={"bootstrap.servers": "localhost:9092"}, + source_topics={"words": source_topic}, + sink_topics={"counts": serialize_counts}, +) +while True: + kafka_driver.run_cycle() +# --8<-- [end:kafka_driver] + + +# Note: you can test it with +# kafka-console-producer --topic words --bootstrap-server=localhost:9092 +# And: +# kafka-console-consumer +# --topic=counts \ +# --bootstrap-server=localhost:9092 \ +# --property print.key=true