Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Intent ID Hashes from Integer to String #8929

Merged
merged 10 commits into from
Jul 21, 2021
1 change: 1 addition & 0 deletions changelog/8929.improvement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added optional flag to convert intent ID hashes from integer to string in the `KafkaEventBroker`.
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ event_broker:
sasl_username: username
sasl_password: password
sasl_mechanism: PLAIN
convert_intent_id_to_string: True
19 changes: 18 additions & 1 deletion rasa/core/brokers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
ssl_check_hostname: bool = False,
security_protocol: Text = "SASL_PLAINTEXT",
loglevel: Union[int, Text] = logging.ERROR,
convert_intent_id_to_string: bool = False,
**kwargs: Any,
) -> None:
"""Kafka event broker.
Expand Down Expand Up @@ -68,7 +69,8 @@ def __init__(
security_protocol: Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
loglevel: Logging level of the kafka logger.

convert_intent_id_to_string: Optional flag to configure whether intent ID's
are converted from an integer to a string.
"""
import kafka

Expand All @@ -85,6 +87,7 @@ def __init__(
self.ssl_certfile = ssl_certfile
self.ssl_keyfile = ssl_keyfile
self.ssl_check_hostname = ssl_check_hostname
self.convert_intent_id_to_string = convert_intent_id_to_string

logging.getLogger("kafka").setLevel(loglevel)

Expand All @@ -107,6 +110,8 @@ def publish(
retry_delay_in_seconds: float = 5,
) -> None:
"""Publishes events."""
if self.convert_intent_id_to_string:
b-quachtran marked this conversation as resolved.
Show resolved Hide resolved
event = self._convert_intent_id_to_string(event)
if self.producer is None:
self._create_producer()
connected = self.producer.bootstrap_connected()
Expand Down Expand Up @@ -200,5 +205,17 @@ def _publish(self, event: Dict[Text, Any]) -> None:
)
self.producer.send(self.topic, value=event, key=partition_key)

def _convert_intent_id_to_string(self, event: Dict[Text, Any]) -> Dict[Text, Any]:
if event.get("event", "") == "user" and "id" in event.get("parse_data", {}).get(
"intent", {}
):
event["parse_data"]["intent"]["id"] = str(
b-quachtran marked this conversation as resolved.
Show resolved Hide resolved
event["parse_data"]["intent"]["id"]
)
for idx, parse_data in enumerate(event["parse_data"]["intent_ranking"]):
parse_data["id"] = str(parse_data["id"])
event["parse_data"]["intent_ranking"][idx] = parse_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this line (or the enumerate) as you are modifying the dict in the line above.

Copy link
Contributor Author

@b-quachtran b-quachtran Jul 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joejuzl intent_ranking is a separate field from intent and contains the full breakdown of intents + ID's, so the loop takes care of that piece. Does removing enumerate still make sense in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry missed this before was merged.
I mean that parse_data is a direct reference to event["parse_data"]["intent_ranking"][idx] so when you update parse_data["id"] you are also also updating event["parse_data"]["intent_ranking"][idx]["id"] so no need to re-assign parse_data to event["parse_data"]["intent_ranking"][idx]. And thus you no longer need to know idx.

Copy link
Contributor Author

@b-quachtran b-quachtran Jul 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joejuzl Does python pass by reference though? I understood it as you have to use the index to update list elements in a loop.

return event

def _close(self) -> None:
self.producer.close()
49 changes: 49 additions & 0 deletions tests/core/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ async def test_kafka_broker_from_config():
topic="topic",
partition_by_sender=True,
security_protocol="SASL_PLAINTEXT",
convert_intent_id_to_string=True,
)

assert actual.url == expected.url
Expand All @@ -270,6 +271,54 @@ async def test_kafka_broker_from_config():
assert actual.sasl_mechanism == expected.sasl_mechanism
assert actual.topic == expected.topic
assert actual.partition_by_sender == expected.partition_by_sender
assert actual.convert_intent_id_to_string == expected.convert_intent_id_to_string


async def test_kafka_broker_convert_intent_id_to_string():
user_event = {
"timestamp": 1517821726.200036,
"metadata": {},
"parse_data": {
"entities": [],
"intent": {"confidence": 0.54, "name": "greet", "id": 7703045398849936579},
"message_id": "987654321",
"metadata": {},
"text": "/greet",
"intent_ranking": [
{"confidence": 0.54, "name": "greet", "id": 7703045398849936579},
{"confidence": 0.31, "name": "goodbye", "id": -5127945386715371244},
{"confidence": 0.15, "name": "default", "id": 1699173715362944540},
],
},
"event": "user",
"text": "/greet",
"input_channel": "rest",
"message_id": "987654321",
}
actual = KafkaEventBroker(
"localhost",
sasl_username="username",
sasl_password="password",
sasl_mechanism="PLAIN",
topic="topic",
partition_by_sender=True,
security_protocol="SASL_PLAINTEXT",
convert_intent_id_to_string=True,
)

converted_user_event = actual._convert_intent_id_to_string(user_event)
intent_ranking = user_event["parse_data"]["intent_ranking"]
converted_intent_ranking = converted_user_event["parse_data"]["intent_ranking"]

assert converted_user_event["parse_data"]["intent"]["id"] == str(
user_event["parse_data"]["intent"]["id"]
)
assert all(
converted_parse_data["id"] == str(parse_data["id"])
for parse_data, converted_parse_data in zip(
intent_ranking, converted_intent_ranking
)
)


@pytest.mark.parametrize(
Expand Down