Skip to content

Commit

Permalink
Merge pull request #7439 from RasaHQ/kafka-list
Browse files Browse the repository at this point in the history
Support server list, drop group_id from producer
  • Loading branch information
akelad authored Jan 7, 2021
2 parents 94b86a9 + 554f824 commit 301b436
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
2 changes: 2 additions & 0 deletions changelog/7439.improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `url` option now supports a list of servers `url: ['10.0.0.158:32803','10.0.0.158:32804']`.
Removed `group_id` because it is not a valid Kafka producer parameter.
18 changes: 8 additions & 10 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(
ssl_check_hostname=False,
topic="rasa_core_events",
client_id=None,
group_id=None,
security_protocol="SASL_PLAINTEXT",
loglevel=logging.ERROR,
) -> None:
Expand All @@ -32,7 +31,6 @@ def __init__(
self.host = host
self.topic = topic
self.client_id = client_id
self.group_id = group_id
self.security_protocol = security_protocol
self.sasl_username = sasl_username
self.sasl_password = sasl_password
Expand Down Expand Up @@ -85,40 +83,41 @@ def publish(self, event, retries=60, retry_delay_in_seconds=5) -> None:
def _create_producer(self) -> None:
import kafka

hosts = [self.host]
if type(self.host) == list:
hosts = self.host

if self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=hosts,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism="PLAIN",
security_protocol=self.security_protocol,
client_id=self.client_id,
group_id=self.group_id,
)
elif self.security_protocol == "PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=hosts,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
security_protocol=self.security_protocol,
client_id=self.client_id,
group_id=self.group_id,
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=hosts,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
client_id=self.client_id,
group_id=self.group_id,
)
elif self.security_protocol == "SASL_SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
bootstrap_servers=hosts,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
Expand All @@ -129,7 +128,6 @@ def _create_producer(self) -> None:
sasl_mechanism="PLAIN",
security_protocol=self.security_protocol,
client_id=self.client_id,
group_id=self.group_id,
)
else:
logger.error("Kafka security_protocol invalid or not set")
Expand Down

0 comments on commit 301b436

Please sign in to comment.