Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPNLPF-2142: stream responses #137

Merged
merged 11 commits into from
Sep 26, 2023
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ htmlcov
tests/TestPass.csv
tests/parsed_log.csv
_trial_marker
/test.py
42 changes: 18 additions & 24 deletions core/basic_models/actions/basic_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, items: Optional[Dict[str, Any]] = None, id: Optional[str] = N
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
raise NotImplementedError
yield

def on_run_error(self, text_preprocessing_result: BaseTextPreprocessingResult, user: BaseUser):
log("exc_handler: Action failed to run. Return None. MESSAGE: %(masked_message)s.", user,
Expand Down Expand Up @@ -73,10 +74,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
commands.append(Command(self.command, self.nodes, self.id, request_type=self.request_type,
request_data=self.request_data))
return commands
yield Command(self.command, self.nodes, self.id, request_type=self.request_type, request_data=self.request_data)


class RequirementAction(Action):
Expand Down Expand Up @@ -106,10 +104,9 @@ def build_internal_item(self) -> str:

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
if self.requirement.check(text_preprocessing_result, user, params):
commands.extend(await self.internal_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.internal_item.run(user, text_preprocessing_result, params):
yield command


class ChoiceAction(Action):
Expand Down Expand Up @@ -142,17 +139,17 @@ def build_else_item(self) -> Optional[str]:

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
choice_is_made = False
for item in self.items:
checked = item.requirement.check(text_preprocessing_result, user, params)
if checked:
commands.extend(await item.internal_item.run(user, text_preprocessing_result, params) or [])
async for command in item.internal_item.run(user, text_preprocessing_result, params):
yield command
choice_is_made = True
break
if not choice_is_made and self._else_item:
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command


class ElseAction(Action):
Expand Down Expand Up @@ -191,12 +188,12 @@ def build_else_item(self) -> Optional[str]:

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None) -> List[Command]:
commands = []
if self.requirement.check(text_preprocessing_result, user, params):
commands.extend(await self.item.run(user, text_preprocessing_result, params) or [])
async for command in self.item.run(user, text_preprocessing_result, params):
yield command
elif self._else_item:
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command


class ActionOfActions(Action):
Expand All @@ -216,10 +213,9 @@ def build_actions(self) -> List[Action]:
class CompositeAction(ActionOfActions):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
for action in self.actions:
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command


class NonRepeatingAction(ActionOfActions):
Expand All @@ -232,7 +228,6 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
last_ids = user.last_action_ids[self._last_action_ids_storage]
all_indexes = list(range(self._actions_count))
max_last_ids_count = self._actions_count - 1
Expand All @@ -242,8 +237,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
action_index = random.choice(available_indexes)
action = self.actions[action_index]
last_ids.add(action_index)
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command


class RandomAction(Action):
Expand All @@ -260,8 +255,7 @@ def build_actions(self) -> List[Action]:

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
pos = random.randint(0, len(self._raw_actions) - 1)
action = self.actions[pos]
commands.extend(await action.run(user, text_preprocessing_result, params=params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params=params):
yield command
8 changes: 4 additions & 4 deletions core/basic_models/actions/client_profile_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
user.behaviors.add(callback_id, self.behavior, scenario_id,
text_preprocessing_result.raw, pickle_deepcopy(params))

commands = await super().run(user, text_preprocessing_result, params)
return commands
async for command in super().run(user, text_preprocessing_result, params):
yield command


class RememberThisAction(StringAction):
Expand Down Expand Up @@ -174,5 +174,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
self.request_data[KAFKA_REPLY_TOPIC] = user.settings["template_settings"]["consumer_topic"]

commands = await super().run(user, text_preprocessing_result, params)
return commands
async for command in super().run(user, text_preprocessing_result, params):
yield command
20 changes: 10 additions & 10 deletions core/basic_models/actions/counter_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
class CounterIncrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].inc(self.value, self.lifetime)
return commands
return
yield


class CounterDecrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].dec(-self.value, self.lifetime)
return commands
return
yield


class CounterClearAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters.clear(self.key)
return commands
return
yield


class CounterSetAction(CounterAction):
Expand All @@ -59,9 +59,9 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].set(self.value, self.reset_time, self.time_shift)
return commands
return
yield


class CounterCopyAction(Action):
Expand All @@ -74,7 +74,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
value = user.counters[self.src].value
user.counters[self.dst].set(value, self.reset_time, self.time_shift)
return commands
return
yield
4 changes: 2 additions & 2 deletions core/basic_models/actions/external_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
action: Action = user.descriptions["external_actions"][self._action_key]
commands = await action.run(user, text_preprocessing_result, params)
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command
16 changes: 8 additions & 8 deletions core/basic_models/actions/push_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"content": self._generate_command_context(user, text_preprocessing_result, params),
}
requests_data = self._render_request_data(params)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)]
return commands
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)


class PushAuthenticationActionHttp(PushAction):
Expand Down Expand Up @@ -137,7 +136,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
params = params or {}
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
params.update(collected)
return await self.http_request_action.run(user, text_preprocessing_result, params)
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command


class GetRuntimePermissionsAction(PushAction):
Expand Down Expand Up @@ -183,9 +183,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
}
}
command_params = self._generate_command_context(user, text_preprocessing_result, params)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)]
return commands
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)


class PushActionHttp(PushAction):
Expand Down Expand Up @@ -331,4 +330,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"payload": self.payload
}
self.http_request_action.method_params["json"] = request_body_parameters
return await self.http_request_action.run(user, text_preprocessing_result, params)
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command
40 changes: 9 additions & 31 deletions core/basic_models/actions/string_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def _get_rendered_tree_recursive(self, value: T, params: Dict, no_empty=False) -
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
raise NotImplementedError
yield


class StringAction(NodeAction):
Expand Down Expand Up @@ -142,9 +143,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
}
})

commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data)]
return commands
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data)


class AfinaAnswerAction(NodeAction):
Expand All @@ -169,7 +169,6 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
answer_params = dict()
result = []

nodes = self.nodes.items() if self.nodes else []
for key, template in nodes:
Expand All @@ -180,9 +179,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
answer_params[key] = rendered

if answer_params:
result = [Command(self.command, answer_params, self.id, request_type=self.request_type,
request_data=self.request_data)]
return result
yield Command(self.command, answer_params, self.id, request_type=self.request_type,
request_data=self.request_data)


class SDKAnswer(NodeAction):
Expand Down Expand Up @@ -240,7 +238,7 @@ class SDKAnswer(NodeAction):
['suggestions', 'buttons', INDEX_WILDCARD, 'title']]

def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
super(SDKAnswer, self).__init__(items, id)
super().__init__(items, id)
self.command: str = ANSWER_TO_USER
if self._nodes == {}:
self._nodes = {i: items.get(i) for i in items if
Expand Down Expand Up @@ -274,21 +272,12 @@ def do_random(self, input_dict: Union[list, dict]):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
result = []
params = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
rendered = self._get_rendered_tree(self.nodes, params, self.no_empty_nodes)
self.do_random(rendered)
if rendered or not self.no_empty_nodes:
result = [
Command(
self.command,
rendered,
self.id,
request_type=self.request_type,
request_data=self.request_data,
)
]
return result
yield Command(self.command, rendered, self.id,
request_type=self.request_type, request_data=self.request_data)


class SDKAnswerToUser(NodeAction):
Expand Down Expand Up @@ -427,8 +416,6 @@ def build_root(self):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:

result = []
params = user.parametrizer.collect(text_preprocessing_result, filter_params={self.COMMAND: self.command})
rendered = self._get_rendered_tree(self.nodes[self.STATIC], params, self.no_empty_nodes)
if self._nodes[self.RANDOM_CHOICE]:
Expand All @@ -454,13 +441,4 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
if part.requirement.check(text_preprocessing_result, user):
out.update(part.render(rendered))
if rendered or not self.no_empty_nodes:
result = [
Command(
self.command,
out,
self.id,
request_type=self.request_type,
request_data=self.request_data,
)
]
return result
yield Command(self.command, out, self.id, request_type=self.request_type, request_data=self.request_data)
12 changes: 6 additions & 6 deletions core/basic_models/actions/variable_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def _set(self, user, value):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params = user.parametrizer.collect(text_preprocessing_result)
try:
# if path is wrong, it may fail with UndefinedError
Expand All @@ -48,7 +47,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
value = None

self._set(user, value)
return commands
return
yield


class SetVariableAction(BaseSetVariableAction):
Expand Down Expand Up @@ -78,9 +78,9 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.variables.delete(self.key)
return commands
return
yield


class ClearVariablesAction(Action):
Expand All @@ -91,9 +91,9 @@ def __init__(self, items: Dict[str, Any] = None, id: Optional[str] = None):

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.variables.clear()
return commands
return
yield


class SetLocalVariableAction(BaseSetVariableAction):
Expand Down
Loading