Skip to content

Commit

Permalink
Merge pull request #31 from tradewelltech/kafka-doc
Browse files Browse the repository at this point in the history
Add kafka doc
  • Loading branch information
0x26res authored Sep 11, 2023
2 parents 43bcbee + 806a471 commit 95b0e74
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 2 deletions.
4 changes: 2 additions & 2 deletions docs/concepts/3_replay.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Replay

This section explains how you can run a dag using historical data, typically stored in files or databases.
This section explains how to run a beavers application using historical data, typically stored in files or databases.

## Manual Replay

Starting with a simple data with one source going to one sink:
Starting with a simple dag with one source going to one sink:

```python
--8<-- "examples/replay_concepts.py:simple_dag"
Expand Down
72 changes: 72 additions & 0 deletions docs/concepts/4_kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Live with Kafka

This section explains how to a beavers application in real time using kafka.

## Count Word Example

Starting with a simple "count word" dag with one source going to one sink:

```python
--8<-- "examples/kafka_concepts.py:dag"
```

This dag has got a source node called `words` and a sink node called `counts`

## Defining Kafka Source

We will be receiving data from kafka, on a topic called `words`.

First we need to define how we deserialize messages coming from kafka:

```python
--8<-- "examples/kafka_concepts.py:deserializer"
```

Then, we put together the `SourceTopic` with its:

- topic (`words`)
- deserializer (`deserialize_messages`)
- replay policy (`from_latest`)

```python
--8<-- "examples/kafka_concepts.py:kafka_source"
```

There are multiple kafka replay policy available, see the api doc for the full list.

## Defining Kafka Sink

We will be sending the results to the `counts` topic.
The key will be the word.T The value will be the latest count.

First we need to define a serializer, which converts each count to a `KafkaProducerMessage`

```python
--8<-- "examples/kafka_concepts.py:serializer"
```

The serializer is responsible for providing the topic for each outgoing message.

## Putting it together with KafkaDriver

The `KafkaDriver` takes care of creating the kafka producer and consumer, and passing the message through:

```python
--8<-- "examples/kafka_concepts.py:kafka_driver"
```

## Beavers Kafka Features

- One consumer: There is only one consumer (rather than one consumer for each topic)
- One producer: There is only one producer (rather than one producer for each topic)
- When polling messages, beavers tries to read all available messages, up to a limit of `batch_size=5000` (which is configurable in the KafkaDriver)
- When replaying past data, beavers orchestrate topic/partition so data is replayed in order, across topics, based on each message timestamp.
- When replaying past data, some newer messages have to be held.
To avoid memory issue, the number of held messages is capped to `batch_size*5`.
Once the number of held messages get to high, partitions that are ahead of the watermark are paused.
These partitions are un-paused once the application catches up


## Beavers Kafka Limitations

- One beavers application consumes every partition for requested topics (no load balancing/scaling)
89 changes: 89 additions & 0 deletions examples/kafka_concepts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# ruff: noqa: E402
# isort: skip_file


import confluent_kafka

# --8<-- [start:dag]
from beavers import Dag


class CountWords:
state = {}

def __call__(self, new_words: list[str]) -> dict[str, int]:
for word in new_words:
self.state[word] = self.state.get(word, 0) + 1
return self.state


def update_stream(
state: dict[str, int], updated_words: list[str]
) -> list[tuple[str, int]]:
return [(word, state[word]) for word in set(updated_words)]


dag = Dag()
word_source = dag.source_stream(name="words")
count_state = dag.state(CountWords()).map(word_source)
count_stream = dag.stream(update_stream, []).map(count_state, word_source)
dag.sink("counts", count_stream)
# --8<-- [end:dag]


# --8<-- [start:deserializer]
def deserialize_messages(messages: list[confluent_kafka.Message]) -> list[str]:
return [message.value() for message in messages]


# --8<-- [end:deserializer]

# --8<-- [start:kafka_source]
from beavers.kafka import SourceTopic, KafkaDriver

source_topic = SourceTopic.from_latest("words", deserialize_messages)
# --8<-- [end:kafka_source]


# --8<-- [start:serializer]
from beavers.kafka import KafkaProducerMessage


def serialize_counts(values: list[tuple[str, int]]) -> list[KafkaProducerMessage]:
return [
KafkaProducerMessage(
topic="counts",
key=word,
value=str(count),
)
for word, count in values
]


# --8<-- [end:serializer]


# --8<-- [start:kafka_driver]
kafka_driver = KafkaDriver.create(
dag=dag,
consumer_config={
"enable.partition.eof": True,
"group.id": "beavers",
"bootstrap.servers": "localhost:9092",
},
producer_config={"bootstrap.servers": "localhost:9092"},
source_topics={"words": source_topic},
sink_topics={"counts": serialize_counts},
)
while True:
kafka_driver.run_cycle()
# --8<-- [end:kafka_driver]


# Note: you can test it with
# kafka-console-producer --topic words --bootstrap-server=localhost:9092
# And:
# kafka-console-consumer
# --topic=counts \
# --bootstrap-server=localhost:9092 \
# --property print.key=true

0 comments on commit 95b0e74

Please sign in to comment.