Skip to content

Commit

Permalink
Revert "Merge pull request #137 from salute-developers/feature/DPNLPF…
Browse files Browse the repository at this point in the history
…_2142_stream_responses"

This reverts commit 616b955, reversing
changes made to a5a2ac2.
  • Loading branch information
Dmitrii Proskurin committed Oct 3, 2023
1 parent 616b955 commit f4fbd57
Show file tree
Hide file tree
Showing 47 changed files with 541 additions and 817 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ htmlcov
tests/TestPass.csv
tests/parsed_log.csv
_trial_marker
/test.py
65 changes: 35 additions & 30 deletions core/basic_models/actions/basic_actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# coding: utf-8
import asyncio
import random
from typing import Union, Dict, List, Any, Optional, AsyncGenerator
from typing import Union, Dict, List, Any, Optional

import core.logging.logger_constants as log_const
from core.basic_models.actions.command import Command
Expand Down Expand Up @@ -32,9 +33,8 @@ def __init__(self, items: Optional[Dict[str, Any]] = None, id: Optional[str] = N
self.version = items.get("version", -1)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
raise NotImplementedError
yield

def on_run_error(self, text_preprocessing_result: BaseTextPreprocessingResult, user: BaseUser):
log("exc_handler: Action failed to run. Return None. MESSAGE: %(masked_message)s.", user,
Expand Down Expand Up @@ -72,8 +72,11 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.nodes = items.get("nodes") or {}

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
yield Command(self.command, self.nodes, self.id, request_type=self.request_type, request_data=self.request_data)
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
commands.append(Command(self.command, self.nodes, self.id, request_type=self.request_type,
request_data=self.request_data))
return commands


class RequirementAction(Action):
Expand Down Expand Up @@ -102,10 +105,11 @@ def build_internal_item(self) -> str:
return self._item

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
if self.requirement.check(text_preprocessing_result, user, params):
async for command in self.internal_item.run(user, text_preprocessing_result, params):
yield command
commands.extend(await self.internal_item.run(user, text_preprocessing_result, params) or [])
return commands


class ChoiceAction(Action):
Expand Down Expand Up @@ -137,18 +141,18 @@ def build_else_item(self) -> Optional[str]:
return self._else_item

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
choice_is_made = False
for item in self.items:
checked = item.requirement.check(text_preprocessing_result, user, params)
if checked:
async for command in item.internal_item.run(user, text_preprocessing_result, params):
yield command
commands.extend(await item.internal_item.run(user, text_preprocessing_result, params) or [])
choice_is_made = True
break
if not choice_is_made and self._else_item:
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands


class ElseAction(Action):
Expand Down Expand Up @@ -185,16 +189,14 @@ def build_item(self) -> str:
def build_else_item(self) -> Optional[str]:
return self._else_item

async def run(
self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None
) -> AsyncGenerator[Command, None]:
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Optional[Dict[str, Union[str, float, int]]]] = None) -> List[Command]:
commands = []
if self.requirement.check(text_preprocessing_result, user, params):
async for command in self.item.run(user, text_preprocessing_result, params):
yield command
commands.extend(await self.item.run(user, text_preprocessing_result, params) or [])
elif self._else_item:
async for command in self.else_item.run(user, text_preprocessing_result, params):
yield command
commands.extend(await self.else_item.run(user, text_preprocessing_result, params) or [])
return commands


class ActionOfActions(Action):
Expand All @@ -213,10 +215,11 @@ def build_actions(self) -> List[Action]:

class CompositeAction(ActionOfActions):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
for action in self.actions:
async for command in action.run(user, text_preprocessing_result, params):
yield command
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands


class NonRepeatingAction(ActionOfActions):
Expand All @@ -228,7 +231,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self._last_action_ids_storage = items["last_action_ids_storage"]

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
last_ids = user.last_action_ids[self._last_action_ids_storage]
all_indexes = list(range(self._actions_count))
max_last_ids_count = self._actions_count - 1
Expand All @@ -238,8 +242,8 @@ async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreproces
action_index = random.choice(available_indexes)
action = self.actions[action_index]
last_ids.add(action_index)
async for command in action.run(user, text_preprocessing_result, params):
yield command
commands.extend(await action.run(user, text_preprocessing_result, params) or [])
return commands


class RandomAction(Action):
Expand All @@ -255,8 +259,9 @@ def build_actions(self) -> List[Action]:
return self._raw_actions

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
pos = random.randint(0, len(self._raw_actions) - 1)
action = self.actions[pos]
async for command in action.run(user, text_preprocessing_result, params=params):
yield command
commands.extend(await action.run(user, text_preprocessing_result, params=params) or [])
return commands
14 changes: 7 additions & 7 deletions core/basic_models/actions/client_profile_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Optional, Union, AsyncGenerator
from typing import Dict, Any, Optional, Union, List

from core.basic_models.actions.command import Command
from core.basic_models.actions.string_actions import StringAction
Expand Down Expand Up @@ -73,15 +73,15 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.request_data[KAFKA_REPLY_TOPIC] = config["template_settings"]["consumer_topic"]

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
if self.behavior:
callback_id = user.message.generate_new_callback_id()
scenario_id = user.last_scenarios.last_scenario_name if hasattr(user, 'last_scenarios') else None
user.behaviors.add(callback_id, self.behavior, scenario_id,
text_preprocessing_result.raw, pickle_deepcopy(params))

async for command in super().run(user, text_preprocessing_result, params):
yield command
commands = await super().run(user, text_preprocessing_result, params)
return commands


class RememberThisAction(StringAction):
Expand Down Expand Up @@ -157,7 +157,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
})

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> Optional[List[Command]]:
self._nodes.update({
"consumer": {
"projectId": user.settings["template_settings"]["project_id"]
Expand All @@ -174,5 +174,5 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
if REPLY_TOPIC_KEY not in self.request_data and KAFKA_REPLY_TOPIC not in self.request_data:
self.request_data[KAFKA_REPLY_TOPIC] = user.settings["template_settings"]["consumer_topic"]

async for command in super().run(user, text_preprocessing_result, params):
yield command
commands = await super().run(user, text_preprocessing_result, params)
return commands
32 changes: 16 additions & 16 deletions core/basic_models/actions/counter_actions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# coding: utf-8
from typing import Union, Dict, Any, Optional, AsyncGenerator
from typing import Union, Dict, Any, Optional, List

from core.basic_models.actions.basic_actions import Action
from core.basic_models.actions.command import Command
Expand All @@ -22,26 +22,26 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):

class CounterIncrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].inc(self.value, self.lifetime)
return
yield
return commands


class CounterDecrementAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].dec(-self.value, self.lifetime)
return
yield
return commands


class CounterClearAction(CounterAction):
async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters.clear(self.key)
return
yield
return commands


class CounterSetAction(CounterAction):
Expand All @@ -58,10 +58,10 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.time_shift = items.get("time_shift", 0)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
user.counters[self.key].set(self.value, self.reset_time, self.time_shift)
return
yield
return commands


class CounterCopyAction(Action):
Expand All @@ -73,8 +73,8 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.time_shift = items.get("time_shift", 0)

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
commands = []
value = user.counters[self.src].value
user.counters[self.dst].set(value, self.reset_time, self.time_shift)
return
yield
return commands
8 changes: 4 additions & 4 deletions core/basic_models/actions/external_actions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Dict, Any, Union, AsyncGenerator
from typing import Optional, Dict, Any, Union, List

from core.basic_models.actions.basic_actions import CommandAction, Action
from core.basic_models.actions.basic_actions import action_factory
Expand All @@ -21,7 +21,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self._action_key: str = items["action"]

async def run(self, user: BaseUser, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
action: Action = user.descriptions["external_actions"][self._action_key]
async for command in action.run(user, text_preprocessing_result, params):
yield command
commands = await action.run(user, text_preprocessing_result, params)
return commands
26 changes: 13 additions & 13 deletions core/basic_models/actions/push_action.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# coding: utf-8
import base64
import uuid
from typing import Union, Dict, Any, Optional, AsyncGenerator
from typing import Union, Dict, List, Any, Optional

from core.basic_models.actions.command import Command
from core.basic_models.actions.string_actions import StringAction
Expand Down Expand Up @@ -69,7 +69,7 @@ def _render_request_data(self, action_params):
return request_data

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params = params or {}
command_params = {
"projectId": user.settings["template_settings"]["project_id"],
Expand All @@ -78,8 +78,9 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"content": self._generate_command_context(user, text_preprocessing_result, params),
}
requests_data = self._render_request_data(params)
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=requests_data, need_payload_wrap=False, need_message_name=False)]
return commands


class PushAuthenticationActionHttp(PushAction):
Expand Down Expand Up @@ -132,12 +133,11 @@ def _create_authorization_token(self, items: Dict[str, Any]) -> str:
return authorization_token

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params = params or {}
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
params.update(collected)
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command
return await self.http_request_action.run(user, text_preprocessing_result, params)


class GetRuntimePermissionsAction(PushAction):
Expand Down Expand Up @@ -167,7 +167,7 @@ def __init__(self, items: Dict[str, Any], id: Optional[str] = None):
self.command = GET_RUNTIME_PERMISSIONS

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params = params or {}
scenario_id = user.last_scenarios.last_scenario_name
user.behaviors.add(user.message.generate_new_callback_id(), self.behavior, scenario_id,
Expand All @@ -183,8 +183,9 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
}
}
command_params = self._generate_command_context(user, text_preprocessing_result, params)
yield Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)
commands = [Command(self.command, command_params, self.id, request_type=self.request_type,
request_data=self.request_data, need_payload_wrap=False, need_message_name=False)]
return commands


class PushActionHttp(PushAction):
Expand Down Expand Up @@ -309,7 +310,7 @@ def _create_instance_of_http_request_action(self, items: Dict[str, Any], id: Opt
self.http_request_action = HTTPRequestAction(items, id)

async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessingResult,
params: Optional[Dict[str, Union[str, float, int]]] = None) -> AsyncGenerator[Command, None]:
params: Optional[Dict[str, Union[str, float, int]]] = None) -> List[Command]:
params = params or {}
collected = user.parametrizer.collect(text_preprocessing_result, filter_params={"command": self.command})
params.update(collected)
Expand All @@ -330,5 +331,4 @@ async def run(self, user: User, text_preprocessing_result: BaseTextPreprocessing
"payload": self.payload
}
self.http_request_action.method_params["json"] = request_body_parameters
async for command in self.http_request_action.run(user, text_preprocessing_result, params):
yield command
return await self.http_request_action.run(user, text_preprocessing_result, params)
Loading

0 comments on commit f4fbd57

Please sign in to comment.