Skip to content

Commit

Permalink
Kafka Integration (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
devhawk authored Sep 9, 2024
1 parent fc76fb7 commit dc55341
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 237 deletions.
29 changes: 28 additions & 1 deletion .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,34 @@ jobs:
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432

# Zookeeper service container (required by Kafka)
zookeeper:
image: bitnami/zookeeper
ports:
- 2181:2181
env:
ALLOW_ANONYMOUS_LOGIN: yes
options: >-
--health-cmd "echo mntr | nc -w 2 -q 2 localhost 2181"
--health-interval 10s
--health-timeout 5s
--health-retries 5
# Kafka service container
kafka:
image: bitnami/kafka
ports:
- 9092:9092
options: >-
--health-cmd "kafka-broker-api-versions.sh --version"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092

steps:
- uses: actions/checkout@v4
with:
Expand Down
2 changes: 2 additions & 0 deletions dbos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .context import DBOSContextEnsure, SetWorkflowID
from .dbos import DBOS, DBOSConfiguredInstance, WorkflowHandle, WorkflowStatus
from .dbos_config import ConfigFile, get_dbos_database_url, load_config
from .kafka_message import KafkaMessage
from .system_database import GetWorkflowsInput, WorkflowStatusString

__all__ = [
Expand All @@ -10,6 +11,7 @@
"DBOSConfiguredInstance",
"DBOSContextEnsure",
"GetWorkflowsInput",
"KafkaMessage",
"SetWorkflowID",
"WorkflowHandle",
"WorkflowStatus",
Expand Down
17 changes: 17 additions & 0 deletions dbos/dbos.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@

if TYPE_CHECKING:
from fastapi import FastAPI
from dbos.kafka import KafkaConsumerWorkflow
from .request import Request
from flask import Flask

from sqlalchemy.orm import Session

from dbos.request import Request

if sys.version_info < (3, 10):
from typing_extensions import ParamSpec, TypeAlias
else:
Expand Down Expand Up @@ -506,6 +509,20 @@ def scheduled(cls, cron: str) -> Callable[[ScheduledWorkflow], ScheduledWorkflow

return scheduled(_get_or_create_dbos_registry(), cron)

@classmethod
def kafka_consumer(
cls, config: dict[str, Any], topics: list[str]
) -> Callable[[KafkaConsumerWorkflow], KafkaConsumerWorkflow]:
"""Decorate a function to be used as a Kafka consumer."""
try:
from dbos.kafka import kafka_consumer

return kafka_consumer(_get_or_create_dbos_registry(), config, topics)
except ModuleNotFoundError as e:
raise DBOSException(
f"{e.name} dependency not found. Please install {e.name} via your package manager."
) from e

@classmethod
def start_workflow(
cls,
Expand Down
94 changes: 94 additions & 0 deletions dbos/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import threading
import traceback
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Callable, Generator, NoReturn, Optional, Union

from confluent_kafka import Consumer, KafkaError, KafkaException
from confluent_kafka import Message as CTypeMessage

if TYPE_CHECKING:
from dbos.dbos import _DBOSRegistry

from .context import SetWorkflowID
from .kafka_message import KafkaMessage
from .logger import dbos_logger

KafkaConsumerWorkflow = Callable[[KafkaMessage], None]


def _kafka_consumer_loop(
func: KafkaConsumerWorkflow,
config: dict[str, Any],
topics: list[str],
stop_event: threading.Event,
) -> None:

def on_error(err: KafkaError) -> NoReturn:
raise KafkaException(err)

config["error_cb"] = on_error
if "auto.offset.reset" not in config:
config["auto.offset.reset"] = "earliest"

consumer = Consumer(config)
try:
consumer.subscribe(topics)
while not stop_event.is_set():
cmsg = consumer.poll(1.0)

if stop_event.is_set():
return

if cmsg is None:
continue

err = cmsg.error()
if err is not None:
dbos_logger.error(
f"Kafka error {err.code()} ({err.name()}): {err.str()}"
)
# fatal errors require an updated consumer instance
if err.code() == KafkaError._FATAL or err.fatal():
original_consumer = consumer
try:
consumer = Consumer(config)
consumer.subscribe(topics)
finally:
original_consumer.close()
else:
msg = KafkaMessage(
headers=cmsg.headers(),
key=cmsg.key(),
latency=cmsg.latency(),
leader_epoch=cmsg.leader_epoch(),
offset=cmsg.offset(),
partition=cmsg.partition(),
timestamp=cmsg.timestamp(),
topic=cmsg.topic(),
value=cmsg.value(),
)
with SetWorkflowID(
f"kafka-unique-id-{msg.topic}-{msg.partition}-{msg.offset}"
):
try:
func(msg)
except Exception as e:
dbos_logger.error(
f"Exception encountered in Kafka consumer: {traceback.format_exc()}"
)

finally:
consumer.close()


def kafka_consumer(
dbosreg: "_DBOSRegistry", config: dict[str, Any], topics: list[str]
) -> Callable[[KafkaConsumerWorkflow], KafkaConsumerWorkflow]:
def decorator(func: KafkaConsumerWorkflow) -> KafkaConsumerWorkflow:
stop_event = threading.Event()
dbosreg.register_poller(
stop_event, _kafka_consumer_loop, func, config, topics, stop_event
)
return func

return decorator
15 changes: 15 additions & 0 deletions dbos/kafka_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from dataclasses import dataclass
from typing import Optional, Union


@dataclass
class KafkaMessage:
headers: Optional[list[tuple[str, Union[str, bytes]]]]
key: Optional[Union[str, bytes]]
latency: Optional[float]
leader_epoch: Optional[int]
offset: Optional[int]
partition: Optional[int]
timestamp: tuple[int, int]
topic: Optional[str]
value: Optional[Union[str, bytes]]
Loading

0 comments on commit dc55341

Please sign in to comment.