Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPNLPF-2142: stream responses #137

Merged
merged 11 commits into from
Sep 26, 2023
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ htmlcov
tests/TestPass.csv
tests/parsed_log.csv
_trial_marker
/test.py
65 changes: 30 additions & 35 deletions core/basic_models/actions/basic_actions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# coding: utf-8
import asyncio
import random
from typing import Union, Dict, List, Any, Optional
from typing import Union, Dict, List, Any, Optional, AsyncGenerator

import core.logging.logger_constants as log_const
from core.basic_models.actions.command import Command
Expand Down Expand Up @@ -33,8 +32,9 @@ def __init__(self, items: Optional[Dict[str, Any]] = None, id: Optional[str] = N
self.version = items.get("version", -1)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
raise NotImplementedError
yield

def on_run_error(self, text_preprocessing_result: BaseTextPreprocessingResult, user: BaseUser):
log("exc_handler: Action failed to run. Return None. MESSAGE: %(masked_message)s.", user,
Expand Down Expand Up @@ -72,11 +72,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.nodes = items.get("nodes") or {}

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
commands.append(Command(self.command, self.nodes, self.id, request_type=self.request_type,
request_data=self.request_data))
return commands
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
yield Command(self.command, self.nodes, self.id, request_type=self.request_type, request_data=self.request_data)


class RequirementAction(Action):
Expand Down Expand Up @@ -105,11 +102,10 @@ def build_internal_item(self) -> str:
return self._item

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
if self.requirement.check(text_preprocessing_result, user, params):
commands.extend(await self.internal_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.internal_item.run(user, text_preprocessing_result, params):
yield command


class ChoiceAction(Action):
Expand Down Expand Up @@ -141,18 +137,18 @@ def build_else_item(self) -> Optional[str]:
return self._else_item

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
choice_is_made = False
for item in self.items:
checked = item.requirement.check(text_preprocessing_result, user, params)
if checked:
commands.extend(await item.internal_item.run(user, text_preprocessing_result, params) or [])
async for command in item.internal_item.run(user, text_preprocessing_result, params):
yield command
choice_is_made = True
break
if not choice_is_made and self._else_item:
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command


class ElseAction(Action):
Expand Down Expand Up @@ -189,14 +185,16 @@ def build_item(self) -> str:
def build_else_item(self) -> Optional[str]:
return self._else_item

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None) -> List[Command]:
commands = []
async def run(
self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None
) -> AsyncGenerator[Command, None]:
if self.requirement.check(text_preprocessing_result, user, params):
commands.extend(await self.item.run(user, text_preprocessing_result, params) or [])
async for command in self.item.run(user, text_preprocessing_result, params):
yield command
elif self._else_item:
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command


class ActionOfActions(Action):
Expand All @@ -215,11 +213,10 @@ def build_actions(self) -> List[Action]:

class CompositeAction(ActionOfActions):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
for action in self.actions:
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command


class NonRepeatingAction(ActionOfActions):
Expand All @@ -231,8 +228,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self._last_action_ids_storage = items["last_action_ids_storage"]

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
last_ids = user.last_action_ids[self._last_action_ids_storage]
all_indexes = list(range(self._actions_count))
max_last_ids_count = self._actions_count - 1
Expand All @@ -242,8 +238,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
action_index = random.choice(available_indexes)
action = self.actions[action_index]
last_ids.add(action_index)
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command


class RandomAction(Action):
Expand All @@ -259,9 +255,8 @@ def build_actions(self) -> List[Action]:
return self._raw_actions

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
pos = random.randint(0, len(self._raw_actions) - 1)
action = self.actions[pos]
commands.extend(await action.run(user, text_preprocessing_result, params=params) or [])
return commands
async for command in action.run(user, text_preprocessing_result, params=params):
yield command
14 changes: 7 additions & 7 deletions core/basic_models/actions/client_profile_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Optional, Union, List
from typing import Dict, Any, Optional, Union, AsyncGenerator

from core.basic_models.actions.command import Command
from core.basic_models.actions.string_actions import StringAction
Expand Down Expand Up @@ -73,15 +73,15 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.request_data[KAFKA_REPLY_TOPIC] = config["template_settings"]["consumer_topic"]

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
if self.behavior:
callback_id = user.message.generate_new_callback_id()
scenario_id = user.last_scenarios.last_scenario_name if hasattr(user, 'last_scenarios') else None
user.behaviors.add(callback_id, self.behavior, scenario_id,
text_preprocessing_result.raw, pickle_deepcopy(params))

commands = await super().run(user, text_preprocessing_result, params)
return commands
async for command in super().run(user, text_preprocessing_result, params):
yield command


class RememberThisAction(StringAction):
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
})

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
self._nodes.update({
"consumer": {
"projectId": user.settings["template_settings"]["project_id"]
Expand All @@ -174,5 +174,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
self.request_data[KAFKA_REPLY_TOPIC] = user.settings["template_settings"]["consumer_topic"]

commands = await super().run(user, text_preprocessing_result, params)
return commands
async for command in super().run(user, text_preprocessing_result, params):
yield command
32 changes: 16 additions & 16 deletions core/basic_models/actions/counter_actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# coding: utf-8
from typing import Union, Dict, Any, Optional, List
from typing import Union, Dict, Any, Optional, AsyncGenerator

from core.basic_models.actions.basic_actions import Action
from core.basic_models.actions.command import Command
Expand All @@ -22,26 +22,26 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

class CounterIncrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
user.counters[self.key].inc(self.value, self.lifetime)
return commands
return
yield


class CounterDecrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
user.counters[self.key].dec(-self.value, self.lifetime)
return commands
return
yield


class CounterClearAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
user.counters.clear(self.key)
return commands
return
yield


class CounterSetAction(CounterAction):
Expand All @@ -58,10 +58,10 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.time_shift = items.get("time_shift", 0)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
user.counters[self.key].set(self.value, self.reset_time, self.time_shift)
return commands
return
yield


class CounterCopyAction(Action):
Expand All @@ -73,8 +73,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.time_shift = items.get("time_shift", 0)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
value = user.counters[self.src].value
user.counters[self.dst].set(value, self.reset_time, self.time_shift)
return commands
return
yield
8 changes: 4 additions & 4 deletions core/basic_models/actions/external_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Dict, Any, Union, List
from typing import Optional, Dict, Any, Union, AsyncGenerator

from core.basic_models.actions.basic_actions import CommandAction, Action
from core.basic_models.actions.basic_actions import action_factory
Expand All @@ -21,7 +21,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self._action_key: str = items["action"]

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
action: Action = user.descriptions["external_actions"][self._action_key]
commands = await action.run(user, text_preprocessing_result, params)
return commands
async for command in action.run(user, text_preprocessing_result, params):
yield command
26 changes: 13 additions & 13 deletions core/basic_models/actions/push_action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coding: utf-8
import base64
import uuid
from typing import Union, Dict, List, Any, Optional
from typing import Union, Dict, Any, Optional, AsyncGenerator

from core.basic_models.actions.command import Command
from core.basic_models.actions.string_actions import StringAction
Expand Down Expand Up @@ -69,7 +69,7 @@ def _render_request_data(self, action_params):
return request_data

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params = params or {}
command_params = {
"projectId": user.settings["template_settings"]["project_id"],
Expand All @@ -78,9 +78,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"content": self._generate_command_context(user, text_preprocessing_result, params),
}
requests_data = self._render_request_data(params)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)]
return commands
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)


class PushAuthenticationActionHttp(PushAction):
Expand Down Expand Up @@ -133,11 +132,12 @@ def _create_authorization_token(self, items: Dict[str, Any]) -> str:
return authorization_token

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params = params or {}
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
params.update(collected)
return await self.http_request_action.run(user, text_preprocessing_result, params)
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command


class GetRuntimePermissionsAction(PushAction):
Expand Down Expand Up @@ -167,7 +167,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.command = GET_RUNTIME_PERMISSIONS

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params = params or {}
scenario_id = user.last_scenarios.last_scenario_name
user.behaviors.add(user.message.generate_new_callback_id(), self.behavior, scenario_id,
Expand All @@ -183,9 +183,8 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
}
}
command_params = self._generate_command_context(user, text_preprocessing_result, params)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)]
return commands
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)


class PushActionHttp(PushAction):
Expand Down Expand Up @@ -310,7 +309,7 @@ def _create_instance_of_http_request_action(self, items: Dict[str, Any], id: Opt
self.http_request_action = HTTPRequestAction(items, id)

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params = params or {}
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
params.update(collected)
Expand All @@ -331,4 +330,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"payload": self.payload
}
self.http_request_action.method_params["json"] = request_body_parameters
return await self.http_request_action.run(user, text_preprocessing_result, params)
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command
Loading
Loading