Skip to content

Commit

Permalink
Merge pull request #1608 from totvs/release/v2.0.0rc27
Browse files Browse the repository at this point in the history
Release/v2.0.0rc27
  • Loading branch information
renanllisboa authored Dec 5, 2024
2 parents a57ee41 + be195a8 commit a60a213
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 136 deletions.
2 changes: 1 addition & 1 deletion scripts/install_package.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ taskkill /f /im chromedriver.exe
echo -------------------------
echo Installing project...
echo -------------------------
pip install -U dist/tir_framework-2.0.0rc26.tar.gz
pip install -U dist/tir_framework-2.0.0rc27.tar.gz
pause >nul | set/p = Press any key to exit ...
6 changes: 2 additions & 4 deletions tir/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class Webapp():
"""
def __init__(self, config_path="", autostart=True):
self.__webapp = WebappInternal(config_path, autostart)
self.__database = BaseDatabase(config_path, autostart=False)
self.config = ConfigLoader()
self.coverage = self.config.coverage

Expand Down Expand Up @@ -1279,7 +1278,7 @@ def QueryExecute(self, query, database_driver="", dbq_oracle_server="", database
>>> # Oracle Example:
>>> self.oHelper.QueryExecute("SELECT * FROM SA1T10", database_driver="Oracle in OraClient19Home1", dbq_oracle_server="Host:Port/oracle instance", database_server="SERVER_NAME", database_name="DATABASE_NAME", database_user="sa", database_password="123456")
"""
return self.__database.query_execute(query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password)
return self.__webapp.query_execute(query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password)

def GetConfigValue(self, json_key):
"""
Expand Down Expand Up @@ -1583,7 +1582,6 @@ class Poui():

def __init__(self, config_path="", autostart=True):
self.__poui = PouiInternal(config_path, autostart)
self.__database = BaseDatabase(config_path, autostart=False)
self.config = ConfigLoader()
self.coverage = self.config.coverage

Expand Down Expand Up @@ -1637,7 +1635,7 @@ def ClickCombo(self, field='', value='', position=1):
>>> oHelper.ClickCombo('Visão', 'Compras')
:return:
"""
self.__poui.click_poui_component(field, value, position, selector="div > po-combo", container=True)
self.__poui.click_combo(field, value, position)

def ClickSelect(self, field='', value='', position=1):
"""
Expand Down
48 changes: 11 additions & 37 deletions tir/technologies/core/base_database.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from tir.technologies.core.base import Base
from tir.technologies.webapp_internal import WebappInternal
import pandas as pd
import pyodbc
import re
from tir.technologies.core.logging_config import logger
from tir.technologies.core.config import ConfigLoader

class BaseDatabase:

class BaseDatabase(Base):

def __init__(self, config_path="", autostart=True):
super().__init__(config_path, autostart=False)
self.webapp_internal = WebappInternal(config_path, autostart=False)
self.restart_counter = self.webapp_internal.restart_counter
def __init__(self):
self.config = ConfigLoader()

def odbc_connect(self, database_driver="", dbq_oracle_server="", database_server="", database_port=1521, database_name="", database_user="", database_password=""):
"""
Expand All @@ -27,16 +23,10 @@ def odbc_connect(self, database_driver="", dbq_oracle_server="", database_server
database_password = self.config.database_password if not database_password else database_password
dbq_oracle_server = self.config.dbq_oracle_server if not dbq_oracle_server else dbq_oracle_server

self.check_pyodbc_drivers(database_driver)

try:
if dbq_oracle_server:
connection = pyodbc.connect(f'DRIVER={database_driver};dbq={dbq_oracle_server};database={database_name};uid={database_user};pwd={database_password}')
else:
connection = pyodbc.connect(f'DRIVER={database_driver};server={database_server};port={database_port};database={database_name};uid={database_user};pwd={database_password}')
except Exception as error:
self.webapp_internal.restart_counter = 3
self.webapp_internal.log_error(str(error))
if dbq_oracle_server:
connection = pyodbc.connect(f'DRIVER={database_driver};dbq={dbq_oracle_server};database={database_name};uid={database_user};pwd={database_password}')
else:
connection = pyodbc.connect(f'DRIVER={database_driver};server={database_server};port={database_port};database={database_name};uid={database_user};pwd={database_password}')

return connection

Expand All @@ -45,10 +35,7 @@ def test_odbc_connection(self, connection):
:param connection:
:return: cursor attribute if connection ok else return False
"""
try:
return connection.cursor()
except:
return False
return connection.cursor()

def connect_database(self, query="", database_driver="", dbq_oracle_server="", database_server="", database_port=1521, database_name="", database_user="", database_password=""):

Expand All @@ -75,14 +62,6 @@ def disconnect_database(self, connection):
else:
logger().info('DataBase connection already stopped')

def check_pyodbc_drivers(self, driver_database):
if not next(iter(list(
filter(lambda x: x == driver_database.lower(), list(map(lambda x: x.lower(), pyodbc.drivers()))))),
None):
error_message = f"Driver: '{driver_database}' isn't a valid driver name!"
self.webapp_internal.restart_counter = 3
self.webapp_internal.log_error(error_message)

def query_execute(self, query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password):
"""
Return a dictionary if the query statement is a SELECT otherwise print a number of row
Expand Down Expand Up @@ -133,16 +112,11 @@ def query_execute(self, query, database_driver, dbq_oracle_server, database_serv
if re.findall(r'^(SELECT)', query.upper()):
df = pd.read_sql(sql=query, con=connection)
return (df.to_dict())
elif re.findall(r'^(UPDATE|DELETE|INSERT)', query.upper()):
self.cursor_execute(query, connection)
else:
self.webapp_internal.log_error(f"Not a valid query in {query}")
self.cursor_execute(query, connection)

def cursor_execute(self, query, connection):
cursor = connection.cursor()
try:
rowcount = cursor.execute(query).rowcount
except Exception as error:
self.webapp_internal.log_error(str(error))
rowcount = cursor.execute(query).rowcount
logger().info(f'{rowcount} row(s) affected')
connection.commit()
26 changes: 18 additions & 8 deletions tir/technologies/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ class ConfigLoader:
This class is instantiated to contain all config information used throughout the execution of the methods.
"""

_instance = None
_json_data = None

def __init__(self, path="config.json"):
def __new__(cls, path="config.json"):
if cls._instance is None:
cls._instance = super(ConfigLoader, cls).__new__(cls)
cls._instance._initialize(path)
return cls._instance

def _initialize(self, path="config.json"):
if ConfigLoader._json_data is None:

if not path:
Expand All @@ -29,6 +36,8 @@ def __init__(self, path="config.json"):
raise Exception(f"JSON file issue: {e}. \n* Please check your config.json *")

if ConfigLoader._json_data:
for key, value in ConfigLoader._json_data.items():
setattr(self, key, value)

data = ConfigLoader._json_data

Expand Down Expand Up @@ -61,13 +70,6 @@ def __init__(self, path="config.json"):
self.skip_restart = ("SkipRestart" in data and bool(data["SkipRestart"]))
self.smart_test = ("SmartTest" in data and bool(data["SmartTest"]))
self.smart_erp = ("SmartERP" in data and bool(data["SmartERP"]))
self.valid_language = self.language != ""
self.initial_program = ""
self.routine = ""
self.date = ""
self.group = ""
self.branch = ""
self.module = ""
self.user_cfg = str(data["UserCfg"]) if "UserCfg" in data else ""
self.password_cfg = str(data["PasswordCfg"]) if "PasswordCfg" in data else ""
self.electron_binary_path = (str(data["BinPath"]) if "BinPath" in data else "")
Expand Down Expand Up @@ -104,6 +106,14 @@ def __init__(self, path="config.json"):
"SSLChromeInstallDisable" in data and bool(data["SSLChromeInstallDisable"]))
self.data_delimiter = str(data["DataDelimiter"]) if "DataDelimiter" in data else "/"
self.procedure_menu = str(data["ProcedureMenu"]) if "ProcedureMenu" in data else ""
self.valid_language = self.language != ""
self.initial_program = ""
self.routine = ""
self.date = ""
self.group = ""
self.branch = ""
self.module = ""
self.routine_type = ""

def check_keys(self, json_data):
valid_keys = [
Expand Down
14 changes: 7 additions & 7 deletions tir/technologies/core/numexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import os


class NumExec(ConfigLoader):
class NumExec():

def __init__(self):
super().__init__()
self.config = ConfigLoader()

def post_exec(self, url, numexec_folder):

Expand All @@ -34,20 +34,20 @@ def post_exec(self, url, numexec_folder):
time.sleep(12)

if error:
response = str(f"STATUS: {status} Url: {url} ID: {self.num_exec} Error: {error}")
response = str(f"STATUS: {status} Url: {url} ID: {self.config.num_exec} Error: {error}")
else:
response = str(f"STATUS: {status} Url: {url} ID: {self.num_exec}")
response = str(f"STATUS: {status} Url: {url} ID: {self.config.num_exec}")

logger().debug(response)
if status not in success_response:

try:
path = Path(self.log_folder, numexec_folder)
path = Path(self.config.log_folder, numexec_folder)
os.makedirs(path)
except OSError:
pass

with open(Path(path, f"{self.num_exec}_TIR_{strftime}.txt"), "w") as json_log:
with open(Path(path, f"{self.config.num_exec}_TIR_{strftime}.txt"), "w") as json_log:
json_log.write(response)

return status in success_response
Expand All @@ -63,7 +63,7 @@ def send_request(self, url):
"https": None,
}

data = {'num_exec': self.num_exec, 'ip_exec': self.ipExec}
data = {'num_exec': self.config.num_exec, 'ip_exec': self.config.ipExec}

response = requests.post(url.strip(), json=data, proxies=proxies)

Expand Down
102 changes: 85 additions & 17 deletions tir/technologies/poui_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def __init__(self, config_path="", autostart=True):
self.tmenu_screen = None
self.grid_memo_field = False
self.range_multiplier = None
self.routine = None

if not Base.driver:
Base.driver = self.driver
Expand Down Expand Up @@ -1289,9 +1288,9 @@ def restart(self):


if self.config.routine:
if self.routine == 'SetLateralMenu':
if self.config.routine_type.lower() == 'setlateralmenu':
self.SetLateralMenu(self.config.routine, save_input=False)
elif self.routine == 'Program':
elif self.config.routine_type.lower() == 'program':
self.set_program(self.config.routine)

def driver_refresh(self):
Expand Down Expand Up @@ -3251,11 +3250,6 @@ def click_poui_component(self, field, value, position, selector, container):

self.poui_click(element)

else:
select_element = main_element.select('select') if hasattr(main_element, 'select') else None
if select_element:
select_element = next(iter(select_element))
self.select_combo(select_element, value, index=True, shadow_root=False)

def poui_click(self, element):

Expand Down Expand Up @@ -3419,14 +3413,11 @@ def POSearch(self, content, placeholder):
self.wait_element(term='po-page')
endtime = time.time() + self.config.time_out
while (not element and time.time() < endtime):
po_page = next(iter(
self.web_scrap(term="[class='po-page']", scrap_type=enum.ScrapType.CSS_SELECTOR,
main_container='body')),
None)

po_page = next(iter(self.web_scrap(term="[class='po-page']", scrap_type=enum.ScrapType.CSS_SELECTOR,
main_container='body')),None)
if po_page:
page_list = next(iter(po_page.find_all_next('div', 'po-page-list-filter-wrapper')), None)

page_list = po_page.find_all_next('div', 'po-page-list-filter-wrapper')
page_list = next(iter(list(filter(lambda x: self.element_is_displayed(x), page_list))),None)
if page_list:
input = page_list.select('input')

Expand All @@ -3440,14 +3431,15 @@ def POSearch(self, content, placeholder):

if not element:
self.log_error("Couldn't find element")

self.switch_to_iframe()
element().clear()
element().send_keys(content)

action = lambda: self.soup_to_selenium(next(iter(input.parent.select('span'))))
action = lambda: self.soup_to_selenium(next(iter(input.parent.select('po-icon'))))
ActionChains(self.driver).move_to_element(action()).click().perform()


def ClickTable(self, first_column, second_column, first_content, second_content, table_number, itens, click_cell, checkbox):
"""
Clicks on the Table of POUI component.
Expand Down Expand Up @@ -3764,6 +3756,7 @@ def click_popup(self, label):
if element:
self.poui_click(element)


def click_checkbox(self, label):
"""Click on the POUI Checkbox.
https://po-ui.io/documentation/po-checkbox
Expand Down Expand Up @@ -3793,3 +3786,78 @@ def click_checkbox(self, label):

if not container_element:
self.log_error(f"CheckBox '{label}' doesn't found!")

def click_combo(self, field, value, position):
'''Select a value for list combo inputs.
:param field: label of field
:type : str
:param value: value to input on field
:type : str
:param position:
:type : int
:return:
'''

main_element = None
success = None
position -= 1
current_value = None
replace = r'[\s,\.:]'

logger().info(f"Clicking on {field}")
self.wait_element(term='po-combo')

endtime = time.time() + self.config.time_out
while (not success and time.time() < endtime):
po_combo = self.web_scrap(term='po-combo', scrap_type=enum.ScrapType.CSS_SELECTOR, position=position, main_container='body')
if po_combo:
po_combo = next(iter(po_combo),None)
po_input = po_combo.find_next('input')
if po_input:
self.open_input_combo(po_combo)
self.click_po_list_box(value)
current_value = self.get_web_value(self.soup_to_selenium(po_input, twebview=True))
success = re.sub(replace, '', current_value).lower() == re.sub(replace, '', value).lower()
if not success:
self.log_error(f'Click on {value} of {field} Fail. Please Check')


def click_po_list_box(self, value):
'''
:param value: Value to select on po-list-box
:type str
:return:
'''
replace = r'[\s,\.:]'
value = value.strip().lower()
self.wait_element(term='po-listbox')

po_list_itens = self.web_scrap(term='.po-item-list', scrap_type=enum.ScrapType.CSS_SELECTOR,
main_container='body')

if po_list_itens:
item_filtered = next(iter(list(filter(lambda x: re.sub(replace, '', x.text.strip().lower()) ==
re.sub(replace, '', value), po_list_itens))), None)
if item_filtered:
self.scroll_to_element(self.soup_to_selenium(item_filtered, twebview=True))
self.click(self.soup_to_selenium(item_filtered, twebview=True))
else:
self.log_error(f'{value} not found')


def open_input_combo(self, po_combo):
'''
:param po_combo: po-combo object
:type: Bs4 object
:return:
'''

combo_container = next(iter(po_combo.select('.po-combo-container')),None)
combo_input = next(iter(po_combo.select('input')), None)

if combo_container:
closed_combo = lambda: self.soup_to_selenium(combo_container, twebview=True).get_attribute('hidden')
endtime = time.time() + self.config.time_out
while (closed_combo() and time.time() < endtime):
self.click(self.soup_to_selenium(combo_input, twebview=True))
Loading

0 comments on commit a60a213

Please sign in to comment.