Skip to content

Commit

Permalink
Updates for client_id and group_id
Browse files Browse the repository at this point in the history
  • Loading branch information
alwx committed Oct 16, 2020
1 parent 7369560 commit 1c26d58
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def __init__(
url: Union[Text, List[Text], None],
topic: Text = "rasa_core_events",
client_id: Optional[Text] = None,
group_id: Optional[Text] = None,
sasl_username: Optional[Text] = None,
sasl_password: Optional[Text] = None,
ssl_cafile: Optional[Text] = None,
Expand Down Expand Up @@ -63,7 +62,6 @@ def __init__(
self.url = url
self.topic = topic
self.client_id = client_id
self.group_id = group_id
self.security_protocol = security_protocol.upper()
self.sasl_username = sasl_username
self.sasl_password = sasl_password
Expand Down Expand Up @@ -93,13 +91,15 @@ def _create_producer(self) -> None:

if self.security_protocol == "PLAINTEXT":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
security_protocol=self.security_protocol,
ssl_check_hostname=False,
)
elif self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
Expand All @@ -109,6 +109,7 @@ def _create_producer(self) -> None:
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
Expand All @@ -119,6 +120,7 @@ def _create_producer(self) -> None:
)
elif self.security_protocol == "SASL_SSL":
self.producer = kafka.KafkaProducer(
client_id=self.client_id,
bootstrap_servers=self.url,
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
Expand Down

0 comments on commit 1c26d58

Please sign in to comment.