diff --git a/scripts/install_package.cmd b/scripts/install_package.cmd index afbc07d9..69beb1dc 100644 --- a/scripts/install_package.cmd +++ b/scripts/install_package.cmd @@ -13,5 +13,5 @@ taskkill /f /im chromedriver.exe echo ------------------------- echo Installing project... echo ------------------------- -pip install -U dist/tir_framework-1.20.27.tar.gz +pip install -U dist/tir_framework-1.20.28.tar.gz pause >nul | set/p = Press any key to exit ... diff --git a/setup.py b/setup.py index b1d61d72..b6cdfd7d 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ 'pyodbc==4.0.39', 'psutil==5.9.5', 'lxml==4.6.5', - 'opencv-python==4.6.0.66', + 'opencv-python==4.8.1.78', 'webdriver-manager' ], packages=find_packages(), diff --git a/tir/main.py b/tir/main.py index 9cc7e818..53a0a82b 100644 --- a/tir/main.py +++ b/tir/main.py @@ -22,7 +22,6 @@ class Webapp(): """ def __init__(self, config_path="", autostart=True): self.__webapp = WebappInternal(config_path, autostart) - self.__database = BaseDatabase(config_path, autostart=False) self.config = ConfigLoader() self.coverage = self.config.coverage @@ -1279,7 +1278,7 @@ def QueryExecute(self, query, database_driver="", dbq_oracle_server="", database >>> # Oracle Example: >>> self.oHelper.QueryExecute("SELECT * FROM SA1T10", database_driver="Oracle in OraClient19Home1", dbq_oracle_server="Host:Port/oracle instance", database_server="SERVER_NAME", database_name="DATABASE_NAME", database_user="sa", database_password="123456") """ - return self.__database.query_execute(query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password) + return self.__webapp.query_execute(query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password) def GetConfigValue(self, json_key): """ @@ -1583,7 +1582,6 @@ class Poui(): def __init__(self, config_path="", autostart=True): self.__poui = PouiInternal(config_path, autostart) - self.__database = BaseDatabase(config_path, autostart=False) self.config = ConfigLoader() self.coverage = self.config.coverage @@ -1637,7 +1635,7 @@ def ClickCombo(self, field='', value='', position=1): >>> oHelper.ClickCombo('Visão', 'Compras') :return: """ - self.__poui.click_poui_component(field, value, position, selector="div > po-combo", container=True) + self.__poui.click_combo(field, value, position) def ClickSelect(self, field='', value='', position=1): """ diff --git a/tir/technologies/core/base.py b/tir/technologies/core/base.py index a6ccc2a5..1dde20e4 100644 --- a/tir/technologies/core/base.py +++ b/tir/technologies/core/base.py @@ -782,6 +782,29 @@ def search_zindex(self,element): zindex = int(element.attrs['style'].split("z-index:")[1].split(";")[0].strip()) return zindex + + def collect_zindex(self, reverse=True): + """ + returns z-index list in decrescent order by default or in crescent order if reverse is False. + """ + + soup = self.get_current_DOM() + + style_elements = soup.find_all(style=True) + + if style_elements: + zindex_list = list(filter(lambda x: 'z-index' in x['style'], style_elements)) + if zindex_list: + zindex_list_filtered = list(map(lambda x: x.attrs['style'].split('z-index')[1].split(';')[0].split(':')[1].strip(), zindex_list)) + return sorted(list(map(int, zindex_list_filtered)), reverse=reverse) + + def return_last_zindex(self): + """ + returns the last z-index value in the page. + """ + zindex_list = self.collect_zindex(reverse=True) + if zindex_list: + return next(iter(zindex_list), None) def select_combo(self, element, option, index=False, shadow_root=True, locator=False): """ diff --git a/tir/technologies/core/base_database.py b/tir/technologies/core/base_database.py index cb7f87af..4ae41fbf 100644 --- a/tir/technologies/core/base_database.py +++ b/tir/technologies/core/base_database.py @@ -1,17 +1,13 @@ -from tir.technologies.core.base import Base -from tir.technologies.webapp_internal import WebappInternal import pandas as pd import pyodbc import re from tir.technologies.core.logging_config import logger +from tir.technologies.core.config import ConfigLoader +class BaseDatabase: -class BaseDatabase(Base): - - def __init__(self, config_path="", autostart=True): - super().__init__(config_path, autostart=False) - self.webapp_internal = WebappInternal(config_path, autostart=False) - self.restart_counter = self.webapp_internal.restart_counter + def __init__(self): + self.config = ConfigLoader() def odbc_connect(self, database_driver="", dbq_oracle_server="", database_server="", database_port=1521, database_name="", database_user="", database_password=""): """ @@ -27,16 +23,10 @@ def odbc_connect(self, database_driver="", dbq_oracle_server="", database_server database_password = self.config.database_password if not database_password else database_password dbq_oracle_server = self.config.dbq_oracle_server if not dbq_oracle_server else dbq_oracle_server - self.check_pyodbc_drivers(database_driver) - - try: - if dbq_oracle_server: - connection = pyodbc.connect(f'DRIVER={database_driver};dbq={dbq_oracle_server};database={database_name};uid={database_user};pwd={database_password}') - else: - connection = pyodbc.connect(f'DRIVER={database_driver};server={database_server};port={database_port};database={database_name};uid={database_user};pwd={database_password}') - except Exception as error: - self.webapp_internal.restart_counter = 3 - self.webapp_internal.log_error(str(error)) + if dbq_oracle_server: + connection = pyodbc.connect(f'DRIVER={database_driver};dbq={dbq_oracle_server};database={database_name};uid={database_user};pwd={database_password}') + else: + connection = pyodbc.connect(f'DRIVER={database_driver};server={database_server};port={database_port};database={database_name};uid={database_user};pwd={database_password}') return connection @@ -45,10 +35,7 @@ def test_odbc_connection(self, connection): :param connection: :return: cursor attribute if connection ok else return False """ - try: - return connection.cursor() - except: - return False + return connection.cursor() def connect_database(self, query="", database_driver="", dbq_oracle_server="", database_server="", database_port=1521, database_name="", database_user="", database_password=""): @@ -74,15 +61,7 @@ def disconnect_database(self, connection): logger().info('DataBase connection stopped') else: logger().info('DataBase connection already stopped') - - def check_pyodbc_drivers(self, driver_database): - if not next(iter(list( - filter(lambda x: x == driver_database.lower(), list(map(lambda x: x.lower(), pyodbc.drivers()))))), - None): - error_message = f"Driver: '{driver_database}' isn't a valid driver name!" - self.webapp_internal.restart_counter = 3 - self.webapp_internal.log_error(error_message) - + def query_execute(self, query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password): """ Return a dictionary if the query statement is a SELECT otherwise print a number of row @@ -133,16 +112,11 @@ def query_execute(self, query, database_driver, dbq_oracle_server, database_serv if re.findall(r'^(SELECT)', query.upper()): df = pd.read_sql(sql=query, con=connection) return (df.to_dict()) - elif re.findall(r'^(UPDATE|DELETE|INSERT)', query.upper()): - self.cursor_execute(query, connection) else: - self.webapp_internal.log_error(f"Not a valid query in {query}") + self.cursor_execute(query, connection) def cursor_execute(self, query, connection): cursor = connection.cursor() - try: - rowcount = cursor.execute(query).rowcount - except Exception as error: - self.webapp_internal.log_error(str(error)) + rowcount = cursor.execute(query).rowcount logger().info(f'{rowcount} row(s) affected') connection.commit() diff --git a/tir/technologies/core/config.py b/tir/technologies/core/config.py index c808e10d..4e27942d 100644 --- a/tir/technologies/core/config.py +++ b/tir/technologies/core/config.py @@ -1,163 +1,184 @@ import json import os from datetime import datetime +import sys class ConfigLoader: """ This class is instantiated to contain all config information used throughout the execution of the methods. """ - def __init__(self, path="config.json"): - valid = os.path.isfile(path) - if valid: - with open(path) as json_data_file: - try: - data = json.load(json_data_file) - if self.validar_chaves(data): - raise Exception(self.validar_chaves(data)) - except Exception as e: - raise Exception(f"JSON file issue: {e}. \n* Please check your config.json *") - else: - data = {} + _instance = None + _json_data = None - today = datetime.today() + def __new__(cls, path="config.json"): + if cls._instance is None: + cls._instance = super(ConfigLoader, cls).__new__(cls) + cls._instance._initialize(path) + return cls._instance - self.json_data = data - self.autostart = True - self.ipExec = str(data["ipExec"]) if "ipExec" in data else "" - self.url_set_start_exec = str(data["UrlSetStartExec"]) if "UrlSetStartExec" in data else "" - self.url_set_end_exec = str(data["UrlSetEndExec"]) if "UrlSetEndExec" in data else "" - self.screenshot = bool(data["ScreenShot"]) if "ScreenShot" in data else True - self.country = str(data["Country"]) if "Country" in data else "BRA" - self.execution_id = str(data["ExecId"]) if "ExecId" in data else today.strftime('%Y%m%d') - self.num_exec = str(data["NumExec"]) if "NumExec" in data else "" - self.issue = str(data["MotExec"]) if "MotExec" in data else "" - self.url = str(data["Url"]) if "Url" in data else "" - self.browser = str(data["Browser"]) if "Browser" in data else "" - self.environment = str(data["Environment"]) if "Environment" in data else "" - self.user = str(data["User"]) if "User" in data else "" - self.password = str(data["Password"]) if "Password" in data else "" - self.language = str(data["Language"]) if "Language" in data else "" - self.skip_environment = ("SkipEnvironment" in data and bool(data["SkipEnvironment"])) - self.headless = ("Headless" in data and bool(data["Headless"])) - self.log_folder = str(data["LogFolder"]) if "LogFolder" in data else "" - self.log_file = ("LogFile" in data and bool(data["LogFile"])) - self.debug_log = ("DebugLog" in data and bool(data["DebugLog"])) - self.time_out = int(data["TimeOut"]) if "TimeOut" in data else 90 - self.parameter_menu = str(data["ParameterMenu"]) if "ParameterMenu" in data else "" - self.screenshot_folder = str(data["ScreenshotFolder"]) if "ScreenshotFolder" in data else "" - self.coverage = ("Coverage" in data and bool(data["Coverage"])) - self.skip_restart = ("SkipRestart" in data and bool(data["SkipRestart"])) - self.smart_test = ("SmartTest" in data and bool(data["SmartTest"])) - self.smart_erp = ("SmartERP" in data and bool(data["SmartERP"])) - self.valid_language = self.language != "" - self.initial_program = "" - self.routine = "" - self.date = "" - self.group = "" - self.branch = "" - self.module = "" - self.user_cfg = str(data["UserCfg"]) if "UserCfg" in data else "" - self.password_cfg = str(data["PasswordCfg"]) if "PasswordCfg" in data else "" - self.electron_binary_path = (str(data["BinPath"]) if "BinPath" in data else "") - self.csv_path = (str(data["CSVPath"]) if "CSVPath" in data else "") - self.database_driver = str(data["DBDriver"]) if "DBDriver" in data else "" - self.database_server = str(data["DBServer"]) if "DBServer" in data else "" - self.database_port = str(data["DBPort"]) if "DBPort" in data else "" - self.database_name = str(data["DBName"]) if "DBName" in data else "" - self.database_user = str(data["DBUser"]) if "DBUser" in data else "" - self.database_password = str(data["DBPassword"]) if "DBPassword" in data else "" - self.dbq_oracle_server = str(data["DBQOracleServer"]) if "DBQOracleServer" in data else "" - self.url_tss = str(data["URL_TSS"]) if "URL_TSS" in data else "" - self.start_program = str(data["StartProgram"]) if "StartProgram" in data else "" - self.new_log = ("NewLog" in data and bool(data["NewLog"])) - self.logurl1 = str(data["LogUrl1"]) if "LogUrl1" in data else "" - self.logurl2 = str(data["LogUrl2"]) if "LogUrl2" in data else "" - self.parameter_url = bool(data["ParameterUrl"]) if "ParameterUrl" in data else False - self.log_http = str(data["LogHttp"]) if "LogHttp" in data else "" - self.baseline_spool = str(data["BaseLine_Spool"]) if "BaseLine_Spool" in data else "" - self.check_value = (bool(data["CheckValue"]) if "CheckValue" in data else None) - self.poui_login = bool(data["POUILogin"]) if "POUILogin" in data else False - self.poui = bool(data["POUI"]) if "POUI" in data else False - self.log_info_config = bool(data["LogInfoConfig"]) if "LogInfoConfig" in data else False - self.release = str(data["Release"]) if "Release" in data else "12.1.2210" - self.top_database = str(data["TopDataBase"]) if "TopDataBase" in data else "MSSQL" - self.lib_version = str(data["Lib"]) if "Lib" in data else "lib_version" - self.build_version = str(data["Build"]) if "Build" in data else "build_version" - self.appserver_folder = str(data["AppServerFolder"]) if "AppServerFolder" in data else "" - self.destination_folder = str(data["DestinationFolder"]) if "DestinationFolder" in data else "" - self.appserver_service = str(data["AppServerService"]) if "AppServerService" in data else "" - self.check_dump = ("CheckDump" in data and bool(data["CheckDump"])) - self.chromedriver_auto_install = ("ChromeDriverAutoInstall" in data and bool(data["ChromeDriverAutoInstall"])) - self.ssl_chrome_auto_install_disable = ( - "SSLChromeInstallDisable" in data and bool(data["SSLChromeInstallDisable"])) - self.data_delimiter = str(data["DataDelimiter"]) if "DataDelimiter" in data else "/" - self.procedure_menu = str(data["ProcedureMenu"]) if "ProcedureMenu" in data else "" + def _initialize(self, path="config.json"): + if ConfigLoader._json_data is None: - def validar_chaves(self, json_data): + if not path: + path = os.path.join(sys.path[0], r"config.json") + + valid = os.path.isfile(path) + + if valid: + with open(path) as json_data_file: + try: + data = json.load(json_data_file) + if self.check_keys(data): + raise Exception(self.check_keys(data)) + ConfigLoader._json_data = data + except Exception as e: + raise Exception(f"JSON file issue: {e}. \n* Please check your config.json *") + + if ConfigLoader._json_data: + for key, value in ConfigLoader._json_data.items(): + setattr(self, key, value) + + data = ConfigLoader._json_data + + today = datetime.today() + self.json_data = data + self.autostart = True + self.ipExec = str(data["ipExec"]) if "ipExec" in data else "" + self.url_set_start_exec = str(data["UrlSetStartExec"]) if "UrlSetStartExec" in data else "" + self.url_set_end_exec = str(data["UrlSetEndExec"]) if "UrlSetEndExec" in data else "" + self.screenshot = bool(data["ScreenShot"]) if "ScreenShot" in data else True + self.country = str(data["Country"]) if "Country" in data else "BRA" + self.execution_id = str(data["ExecId"]) if "ExecId" in data else today.strftime('%Y%m%d') + self.num_exec = str(data["NumExec"]) if "NumExec" in data else "" + self.issue = str(data["MotExec"]) if "MotExec" in data else "" + self.url = str(data["Url"]) if "Url" in data else "" + self.browser = str(data["Browser"]) if "Browser" in data else "" + self.environment = str(data["Environment"]) if "Environment" in data else "" + self.user = str(data["User"]) if "User" in data else "" + self.password = str(data["Password"]) if "Password" in data else "" + self.language = str(data["Language"]) if "Language" in data else "" + self.skip_environment = ("SkipEnvironment" in data and bool(data["SkipEnvironment"])) + self.headless = ("Headless" in data and bool(data["Headless"])) + self.log_folder = str(data["LogFolder"]) if "LogFolder" in data else "" + self.log_file = ("LogFile" in data and bool(data["LogFile"])) + self.debug_log = ("DebugLog" in data and bool(data["DebugLog"])) + self.time_out = int(data["TimeOut"]) if "TimeOut" in data else 90 + self.parameter_menu = str(data["ParameterMenu"]) if "ParameterMenu" in data else "" + self.screenshot_folder = str(data["ScreenshotFolder"]) if "ScreenshotFolder" in data else "" + self.coverage = ("Coverage" in data and bool(data["Coverage"])) + self.skip_restart = ("SkipRestart" in data and bool(data["SkipRestart"])) + self.smart_test = ("SmartTest" in data and bool(data["SmartTest"])) + self.smart_erp = ("SmartERP" in data and bool(data["SmartERP"])) + self.user_cfg = str(data["UserCfg"]) if "UserCfg" in data else "" + self.password_cfg = str(data["PasswordCfg"]) if "PasswordCfg" in data else "" + self.electron_binary_path = (str(data["BinPath"]) if "BinPath" in data else "") + self.csv_path = (str(data["CSVPath"]) if "CSVPath" in data else "") + self.database_driver = str(data["DBDriver"]) if "DBDriver" in data else "" + self.database_server = str(data["DBServer"]) if "DBServer" in data else "" + self.database_port = str(data["DBPort"]) if "DBPort" in data else "" + self.database_name = str(data["DBName"]) if "DBName" in data else "" + self.database_user = str(data["DBUser"]) if "DBUser" in data else "" + self.database_password = str(data["DBPassword"]) if "DBPassword" in data else "" + self.dbq_oracle_server = str(data["DBQOracleServer"]) if "DBQOracleServer" in data else "" + self.url_tss = str(data["URL_TSS"]) if "URL_TSS" in data else "" + self.start_program = str(data["StartProgram"]) if "StartProgram" in data else "" + self.new_log = ("NewLog" in data and bool(data["NewLog"])) + self.logurl1 = str(data["LogUrl1"]) if "LogUrl1" in data else "" + self.logurl2 = str(data["LogUrl2"]) if "LogUrl2" in data else "" + self.parameter_url = bool(data["ParameterUrl"]) if "ParameterUrl" in data else False + self.log_http = str(data["LogHttp"]) if "LogHttp" in data else "" + self.baseline_spool = str(data["BaseLine_Spool"]) if "BaseLine_Spool" in data else "" + self.check_value = (bool(data["CheckValue"]) if "CheckValue" in data else None) + self.poui_login = bool(data["POUILogin"]) if "POUILogin" in data else False + self.poui = bool(data["POUI"]) if "POUI" in data else False + self.log_info_config = bool(data["LogInfoConfig"]) if "LogInfoConfig" in data else False + self.release = str(data["Release"]) if "Release" in data else "12.1.2210" + self.top_database = str(data["TopDataBase"]) if "TopDataBase" in data else "MSSQL" + self.lib_version = str(data["Lib"]) if "Lib" in data else "lib_version" + self.build_version = str(data["Build"]) if "Build" in data else "build_version" + self.appserver_folder = str(data["AppServerFolder"]) if "AppServerFolder" in data else "" + self.destination_folder = str(data["DestinationFolder"]) if "DestinationFolder" in data else "" + self.appserver_service = str(data["AppServerService"]) if "AppServerService" in data else "" + self.check_dump = ("CheckDump" in data and bool(data["CheckDump"])) + self.chromedriver_auto_install = ("ChromeDriverAutoInstall" in data and bool(data["ChromeDriverAutoInstall"])) + self.ssl_chrome_auto_install_disable = ( + "SSLChromeInstallDisable" in data and bool(data["SSLChromeInstallDisable"])) + self.data_delimiter = str(data["DataDelimiter"]) if "DataDelimiter" in data else "/" + self.procedure_menu = str(data["ProcedureMenu"]) if "ProcedureMenu" in data else "" + self.valid_language = self.language != "" + self.initial_program = "" + self.routine = "" + self.date = "" + self.group = "" + self.branch = "" + self.module = "" + self.routine_type = "" + + def check_keys(self, json_data): valid_keys = [ - "ipExec", - "UrlSetStartExec", - "UrlSetEndExec", - "ScreenShot", - "Country", - "ExecId", - "NumExec", - "MotExec", - "Url", - "Browser", - "Environment", - "User", - "Password", - "Language", - "SkipEnvironment", - "Headless", - "LogFolder", - "LogFile", - "DebugLog", - "TimeOut", - "ParameterMenu", - "ScreenshotFolder", - "Coverage", - "SkipRestart", - "SmartTest", - "SmartERP", - "UserCfg", - "PasswordCfg", - "BinPath", - "CSVPath", - "DBDriver", - "DBServer", - "DBPort", - "DBName", - "DBUser", - "DBPassword", - "DBQOracleServer", - "URL_TSS", - "StartProgram", - "NewLog", - "LogUrl1", - "LogUrl2", - "ParameterUrl", - "LogHttp", - "BaseLine_Spool", - "CheckValue", - "POUILogin", - "POUI", - "LogInfoConfig", - "Release", - "TopDataBase", - "Lib", - "Build", - "AppServerFolder", - "DestinationFolder", - "AppServerService", - "CheckDump", - "ChromeDriverAutoInstall", - "SSLChromeInstallDisable", - "DataDelimiter", - "ProcedureMenu" -] + "ipExec", + "UrlSetStartExec", + "UrlSetEndExec", + "ScreenShot", + "Country", + "ExecId", + "NumExec", + "MotExec", + "Url", + "Browser", + "Environment", + "User", + "Password", + "Language", + "SkipEnvironment", + "Headless", + "LogFolder", + "LogFile", + "DebugLog", + "TimeOut", + "ParameterMenu", + "ScreenshotFolder", + "Coverage", + "SkipRestart", + "SmartTest", + "SmartERP", + "UserCfg", + "PasswordCfg", + "BinPath", + "CSVPath", + "DBDriver", + "DBServer", + "DBPort", + "DBName", + "DBUser", + "DBPassword", + "DBQOracleServer", + "URL_TSS", + "StartProgram", + "NewLog", + "LogUrl1", + "LogUrl2", + "ParameterUrl", + "LogHttp", + "BaseLine_Spool", + "CheckValue", + "POUILogin", + "POUI", + "LogInfoConfig", + "Release", + "TopDataBase", + "Lib", + "Build", + "AppServerFolder", + "DestinationFolder", + "AppServerService", + "CheckDump", + "ChromeDriverAutoInstall", + "SSLChromeInstallDisable", + "DataDelimiter", + "ProcedureMenu" + ] keys_json = set(json_data.keys()) wrong_keys = keys_json - set(valid_keys) diff --git a/tir/technologies/core/logging_config.py b/tir/technologies/core/logging_config.py index 7c64eb69..f1338c97 100644 --- a/tir/technologies/core/logging_config.py +++ b/tir/technologies/core/logging_config.py @@ -8,98 +8,11 @@ import socket import inspect - -config = ConfigLoader() - filename = None folder = None file_path = None - -def logger(logger='root'): - """ - :return: - """ - - global filename - global folder - global file_path - - if config.smart_test or config.debug_log: - - logger = 'console' # TODO configuração temporaria para o server. - - today = datetime.today() - - file_handler = True if logger == 'root' else False - - if not file_path and file_handler: - filename = f"TIR_{get_file_name('testsuite')}_{today.strftime('%Y%m%d%H%M%S%f')[:-3]}.log" - - folder = create_folder() - - file_path = create_file(folder, filename) - - logging_config = { - 'version': 1, - 'loggers': { - 'root': { # root logger - 'level': 'DEBUG', - 'handlers': ['debug_console_handler', 'debug_file_handler'] - }, - 'console': { # console logger - 'level': 'DEBUG', - 'handlers': ['debug_console_handler'] - } - }, - 'handlers': { - 'debug_console_handler': { - 'level': 'DEBUG', - 'formatter': 'info', - 'class': 'logging.StreamHandler', - 'stream': 'ext://sys.stdout', - }, - 'debug_file_handler': { - 'level': 'DEBUG', - 'formatter': 'info', - 'filename': Path(folder, filename) if file_handler else 'none.log', - 'class': 'logging.FileHandler', - 'mode': 'a' - }, - }, - 'formatters': { - 'info': { - 'format': '%(asctime)s-%(levelname)s-%(name)s-%(process)d::%(module)s::%(funcName)s|%(lineno)s:: %(message)s' - }, - }, - } - - else: - - logging_config = { - 'version': 1, - 'loggers': { - 'root': { # root logger - 'level': 'INFO', - 'handlers': ['debug_console_handler'] - }, - }, - 'handlers': { - 'debug_console_handler': { - 'level': 'INFO', - 'formatter': 'info', - 'class': 'logging.StreamHandler', - 'stream': 'ext://sys.stdout', - }, - }, - 'formatters': { - 'info': { - 'format': '%(asctime)s-%(levelname)s:: %(message)s' - }, - }, - } - - dictConfig(logging_config) - return logging.getLogger(logger) +config = None +_logger = None def get_file_name(file_name): """ @@ -125,36 +38,151 @@ def create_folder(): """ path = None + folder_path = None + error = None try: if config.log_http: - folder_path = Path(config.log_http, config.country, config.release, config.issue, - config.execution_id, get_file_name('testsuite')) - path = Path(folder_path) + folder_path = Path(config.log_http, config.country, config.release, config.issue,config.execution_id, get_file_name('testsuite')) + os.makedirs(Path(folder_path)) + elif config.log_folder: + folder_path = Path(config.log_folder) os.makedirs(Path(folder_path)) else: path = Path("Log", socket.gethostname()) os.makedirs(Path("Log", socket.gethostname())) - except OSError: - pass + except Exception as e: + error = e + + if folder_path: + path = str(Path(folder_path)) - return path + if os.path.isdir(path): + return str(path) + else: + raise Exception(f"Folder path not found: {path}: {str(error)}") -def create_file(folder, filename): +def create_file(): """ Creates an empty file before logger [Internal] """ + today = datetime.today() + + filename = f"TIR_{get_file_name('testsuite')}_{today.strftime('%Y%m%d%H%M%S%f')[:-3]}.log" + + folder = create_folder() + success = False - endtime = time.time() + config.time_out + error = None + endtime = time.time() + config.time_out while (time.time() < endtime and not success): try: with open(Path(folder, filename), "w", ): - return True - except Exception as error: - time.sleep(30) - logger().debug(str(error)) + return str(Path(folder, filename)) + except Exception as e: + time.sleep(5) + error = str(e) + print(error) + + if error: + return error + +def configure_logger(): + """ + :return: + """ + + global _logger + global filename + global folder + global file_path + global config + + config = ConfigLoader() + + if not config._json_data: + return + + logger_profile = 'user_console' + + if config.debug_log: + logger_profile = 'root' + else: + logger_profile = 'debug_console' if config.smart_test else logger_profile + + if logger_profile == 'root': + file_path = create_file() + + logging_config = { + 'version': 1, + 'formatters': { + 'debug': { + 'format': '%(asctime)s-%(levelname)s-%(name)s-%(process)d::%(module)s::%(funcName)s|%(lineno)s:: %(message)s' + }, + 'info':{ + 'format': '%(asctime)s-%(levelname)s:: %(message)s' + } + }, + 'handlers': { + 'debug_console_handler': { + 'level': 'DEBUG', + 'formatter': 'debug', + 'class': 'logging.StreamHandler', + 'stream': 'ext://sys.stdout', + }, + 'info_console_handler': { + 'level': 'INFO', + 'formatter': 'info', + 'class': 'logging.StreamHandler', + 'stream': 'ext://sys.stdout', + } + }, + 'loggers': { + 'root': { # root logger + 'level': 'DEBUG', + 'handlers': ['debug_console_handler'] + }, + 'debug_console': { # console logger + 'level': 'DEBUG', + 'handlers': ['debug_console_handler'] + }, + 'user_console': { # user console logger + 'level': 'INFO', + 'handlers': ['info_console_handler'] + }, + }, + } + + if file_path and os.path.exists(file_path): + logging_config['handlers']['memory_handler'] = { + 'level': 'DEBUG', + 'formatter': 'debug', + 'class': 'logging.handlers.MemoryHandler', + 'capacity': 5*1024*1024, + 'flushLevel': logging.CRITICAL, + 'target': 'debug_file_handler' + } + logging_config['handlers']['debug_file_handler'] = { + 'level': 'DEBUG', + 'formatter': 'debug', + 'class': 'logging.FileHandler', + 'filename': file_path, + 'mode': 'a' + } + logging_config['loggers']['root']['handlers'].append('memory_handler') + logging_config['loggers']['root']['handlers'].append('debug_file_handler') + + dictConfig(logging_config) + _logger = logging.getLogger(logger_profile) + +def logger(): + global _logger + if _logger is None: + configure_logger() + _logger.debug(f"Log file created: '{file_path}'") + return _logger diff --git a/tir/technologies/core/numexec.py b/tir/technologies/core/numexec.py index 2850592c..abeba755 100644 --- a/tir/technologies/core/numexec.py +++ b/tir/technologies/core/numexec.py @@ -7,10 +7,10 @@ import os -class NumExec(ConfigLoader): +class NumExec(): def __init__(self): - super().__init__() + self.config = ConfigLoader() def post_exec(self, url, numexec_folder): @@ -34,20 +34,20 @@ def post_exec(self, url, numexec_folder): time.sleep(12) if error: - response = str(f"STATUS: {status} Url: {url} ID: {self.num_exec} Error: {error}") + response = str(f"STATUS: {status} Url: {url} ID: {self.config.num_exec} Error: {error}") else: - response = str(f"STATUS: {status} Url: {url} ID: {self.num_exec}") + response = str(f"STATUS: {status} Url: {url} ID: {self.config.num_exec}") logger().debug(response) if status not in success_response: try: - path = Path(self.log_folder, numexec_folder) + path = Path(self.config.log_folder, numexec_folder) os.makedirs(path) except OSError: pass - with open(Path(path, f"{self.num_exec}_TIR_{strftime}.txt"), "w") as json_log: + with open(Path(path, f"{self.config.num_exec}_TIR_{strftime}.txt"), "w") as json_log: json_log.write(response) return status in success_response @@ -63,7 +63,7 @@ def send_request(self, url): "https": None, } - data = {'num_exec': self.num_exec, 'ip_exec': self.ipExec} + data = {'num_exec': self.config.num_exec, 'ip_exec': self.config.ipExec} response = requests.post(url.strip(), json=data, proxies=proxies) diff --git a/tir/technologies/poui_internal.py b/tir/technologies/poui_internal.py index b75e042d..1f4ada8d 100644 --- a/tir/technologies/poui_internal.py +++ b/tir/technologies/poui_internal.py @@ -93,7 +93,6 @@ def __init__(self, config_path="", autostart=True): self.tmenu_screen = None self.grid_memo_field = False self.range_multiplier = None - self.routine = None if not Base.driver: Base.driver = self.driver @@ -1289,9 +1288,9 @@ def restart(self): if self.config.routine: - if self.routine == 'SetLateralMenu': + if self.config.routine_type.lower() == 'setlateralmenu': self.SetLateralMenu(self.config.routine, save_input=False) - elif self.routine == 'Program': + elif self.config.routine_type.lower() == 'program': self.set_program(self.config.routine) def driver_refresh(self): @@ -3251,11 +3250,6 @@ def click_poui_component(self, field, value, position, selector, container): self.poui_click(element) - else: - select_element = main_element.select('select') if hasattr(main_element, 'select') else None - if select_element: - select_element = next(iter(select_element)) - self.select_combo(select_element, value, index=True, shadow_root=False) def poui_click(self, element): @@ -3419,14 +3413,11 @@ def POSearch(self, content, placeholder): self.wait_element(term='po-page') endtime = time.time() + self.config.time_out while (not element and time.time() < endtime): - po_page = next(iter( - self.web_scrap(term="[class='po-page']", scrap_type=enum.ScrapType.CSS_SELECTOR, - main_container='body')), - None) - + po_page = next(iter(self.web_scrap(term="[class='po-page']", scrap_type=enum.ScrapType.CSS_SELECTOR, + main_container='body')),None) if po_page: - page_list = next(iter(po_page.find_all_next('div', 'po-page-list-filter-wrapper')), None) - + page_list = po_page.find_all_next('div', 'po-page-list-filter-wrapper') + page_list = next(iter(list(filter(lambda x: self.element_is_displayed(x), page_list))),None) if page_list: input = page_list.select('input') @@ -3440,14 +3431,15 @@ def POSearch(self, content, placeholder): if not element: self.log_error("Couldn't find element") - + self.switch_to_iframe() element().clear() element().send_keys(content) - action = lambda: self.soup_to_selenium(next(iter(input.parent.select('span')))) + action = lambda: self.soup_to_selenium(next(iter(input.parent.select('po-icon')))) ActionChains(self.driver).move_to_element(action()).click().perform() + def ClickTable(self, first_column, second_column, first_content, second_content, table_number, itens, click_cell, checkbox): """ Clicks on the Table of POUI component. @@ -3612,7 +3604,7 @@ def POTabs(self, label): self.wait_element(term="[class='po-tabs-container']", scrap_type=enum.ScrapType.CSS_SELECTOR) - po_tab_button = self.web_scrap(term="[class='po-tab-button']", scrap_type=enum.ScrapType.CSS_SELECTOR, + po_tab_button = self.web_scrap(term="po-tab-button", scrap_type=enum.ScrapType.CSS_SELECTOR, main_container='body') label_element = next( @@ -3763,6 +3755,7 @@ def click_popup(self, label): if element: self.poui_click(element) + def click_checkbox(self, label): """Click on the POUI Checkbox. https://po-ui.io/documentation/po-checkbox @@ -3794,4 +3787,80 @@ def click_checkbox(self, label): self.poui_click(container_element) if not container_element: - self.log_error(f"CheckBox '{label}' doesn't found!") \ No newline at end of file + self.log_error(f"CheckBox '{label}' doesn't found!") + + + def click_combo(self, field, value, position): + '''Select a value for list combo inputs. + + :param field: label of field + :type : str + :param value: value to input on field + :type : str + :param position: + :type : int + :return: + ''' + + main_element = None + success = None + position -= 1 + current_value = None + replace = r'[\s,\.:]' + + logger().info(f"Clicking on {field}") + self.wait_element(term='po-combo') + + endtime = time.time() + self.config.time_out + while (not success and time.time() < endtime): + po_combo = self.web_scrap(term='po-combo', scrap_type=enum.ScrapType.CSS_SELECTOR, position=position, main_container='body') + if po_combo: + po_combo = next(iter(po_combo),None) + po_input = po_combo.find_next('input') + if po_input: + self.open_input_combo(po_combo) + self.click_po_list_box(value) + current_value = self.get_web_value(self.soup_to_selenium(po_input, twebview=True)) + success = re.sub(replace, '', current_value).lower() == re.sub(replace, '', value).lower() + if not success: + self.log_error(f'Click on {value} of {field} Fail. Please Check') + + + def click_po_list_box(self, value): + ''' + :param value: Value to select on po-list-box + :type str + :return: + ''' + replace = r'[\s,\.:]' + value = value.strip().lower() + self.wait_element(term='po-listbox') + + po_list_itens = self.web_scrap(term='.po-item-list', scrap_type=enum.ScrapType.CSS_SELECTOR, + main_container='body') + + if po_list_itens: + item_filtered = next(iter(list(filter(lambda x: re.sub(replace, '', x.text.strip().lower()) == + re.sub(replace, '', value), po_list_itens))), None) + if item_filtered: + self.scroll_to_element(self.soup_to_selenium(item_filtered, twebview=True)) + self.click(self.soup_to_selenium(item_filtered, twebview=True)) + else: + self.log_error(f'{value} not found') + + + def open_input_combo(self, po_combo): + ''' + :param po_combo: po-combo object + :type: Bs4 object + :return: + ''' + + combo_container = next(iter(po_combo.select('.po-combo-container')),None) + combo_input = next(iter(po_combo.select('input')), None) + + if combo_container: + closed_combo = lambda: self.soup_to_selenium(combo_container, twebview=True).get_attribute('hidden') + endtime = time.time() + self.config.time_out + while (closed_combo() and time.time() < endtime): + self.click(self.soup_to_selenium(combo_input, twebview=True)) diff --git a/tir/technologies/webapp_internal.py b/tir/technologies/webapp_internal.py index 789c57b5..747ec106 100644 --- a/tir/technologies/webapp_internal.py +++ b/tir/technologies/webapp_internal.py @@ -9,6 +9,9 @@ import shutil import cv2 import socket +import pathlib +import sys +import tir.technologies.core.enumerations as enum from functools import reduce from selenium.webdriver.common.keys import Keys from bs4 import BeautifulSoup, Tag @@ -16,7 +19,6 @@ from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import Select -import tir.technologies.core.enumerations as enum from tir.technologies.core import base from tir.technologies.core.log import Log, nump from tir.technologies.core.config import ConfigLoader @@ -29,8 +31,19 @@ from selenium.common.exceptions import * from datetime import datetime from tir.technologies.core.logging_config import logger -import pathlib -import sys +from tir.technologies.core.base_database import BaseDatabase + +def count_time(func): + """ + Decorator to count the time spent in a function. + """ + def wrapper(*args, **kwargs): + starttime = time.time() + result = func(*args, **kwargs) + endtime = time.time() + logger().debug(f"Time spent in {func.__name__}: {endtime - starttime}") + return result + return wrapper class WebappInternal(Base): """ @@ -104,7 +117,6 @@ def __init__(self, config_path="", autostart=True): self.tmenu_screen = None self.grid_memo_field = False self.range_multiplier = None - self.routine = None self.test_suite = [] self.current_test_suite = self.log.get_file_name('testsuite') self.restart_tss = False @@ -242,6 +254,8 @@ def Setup(self, initial_program, date='', group='99', branch='01', module='', sa if not self.log.program: self.log.program = self.get_program_name() + server_environment = self.config.environment + if save_input: self.config.initial_program = initial_program self.config.date = self.date_format(date) @@ -251,14 +265,14 @@ def Setup(self, initial_program, date='', group='99', branch='01', module='', sa if self.config.coverage: self.open_url_coverage(url=self.config.url, initial_program=initial_program, - environment=self.config.environment) + environment=server_environment) if not self.config.valid_language: self.config.language = self.get_language() self.language = LanguagePack(self.config.language) if not self.config.skip_environment and not self.config.coverage: - self.program_screen(initial_program=initial_program, coverage=False, poui=self.config.poui_login) + self.program_screen(initial_program=initial_program, environment=server_environment, poui=self.config.poui_login) self.log.webapp_version = self.driver.execute_script("return app.VERSION") @@ -371,7 +385,7 @@ def service_process_bat_file(self): with open("firefox_task_kill.bat", "w", ) as firefox_task_kill: firefox_task_kill.write(f"taskkill /f /PID {self.driver.service.process.pid} /T") - def program_screen(self, initial_program="", environment="", coverage=False, poui=False): + def program_screen(self, initial_program="", environment="", poui=False): """ [Internal] @@ -388,112 +402,92 @@ def program_screen(self, initial_program="", environment="", coverage=False, pou >>> self.program_screen("SIGAADV", "MYENVIRONMENT") """ + if not environment: + environment = self.config.environment + self.config.poui_login = poui - if coverage: - self.open_url_coverage(url=self.config.url, initial_program=initial_program, - environment=self.config.environment) + self.filling_initial_program(initial_program) + self.filling_server_environment(environment) + + if self.webapp_shadowroot(): + self.wait_until_to(expected_condition = "element_to_be_clickable", element=".startParameters", locator = By.CSS_SELECTOR) + parameters_screen = self.driver.find_element(By.CSS_SELECTOR, ".startParameters") + buttons = self.find_shadow_element('wa-button', parameters_screen) + button = next(iter(list(filter(lambda x: 'ok' in x.text.lower().strip(), buttons))), None) else: - try_counter = 0 - if self.webapp_shadowroot(): - start_program = '#selectStartProg' - input_environment = '#selectEnv' - else: - start_program = '#inputStartProg' - input_environment = '#inputEnv' + button = self.driver.find_element(By.CSS_SELECTOR, ".button-ok") - self.wait_element(term=start_program, scrap_type=enum.ScrapType.CSS_SELECTOR, main_container="body") - self.wait_element(term=input_environment, scrap_type=enum.ScrapType.CSS_SELECTOR, main_container="body") - soup = self.get_current_DOM() + self.click(button) - start_prog_element = next(iter(soup.select(start_program)), None) - if start_prog_element is None: - self.restart_counter += 1 - message = "Couldn't find Initial Program input element." - self.log_error(message) - raise ValueError(message) + def filling_initial_program(self, initial_program): + """ + [Internal] + """ - start_prog = lambda: self.soup_to_selenium(start_prog_element) + if self.webapp_shadowroot(): + start_program = '#selectStartProg' + else: + start_program = '#inputStartProg' - endtime = time.time() + self.config.time_out - if self.webapp_shadowroot(): - start_prog_value = lambda: self.get_web_value(next(iter(self.find_shadow_element('input', start_prog())))).strip() if self.find_shadow_element('input', start_prog()) else None - else: - start_prog_value = lambda: self.get_web_value(start_prog()) + logger().info(f'Filling Initial Program: "{initial_program}"') - endtime = time.time() + self.config.time_out - while (time.time() < endtime and (start_prog_value() != initial_program.strip())): + self.fill_select_element(term=start_program, user_value=initial_program) + + def filling_server_environment(self, environment): + """ + [Internal] + """ + + if self.webapp_shadowroot(): + input_environment = '#selectEnv' + else: + input_environment = '#inputEnv' - logger().info(f'Filling Initial Program: "{initial_program}"') + logger().info(f'Filling Server Environment: "{environment}"') - start_prog = lambda: self.soup_to_selenium(start_prog_element) + self.fill_select_element(term=input_environment, user_value=environment) + + def fill_select_element(self, term, user_value): - self.set_element_focus(start_prog()) - self.click(start_prog()) - ActionChains(self.driver).key_down(Keys.CONTROL).send_keys(Keys.HOME).key_up(Keys.CONTROL).perform() - ActionChains(self.driver).key_down(Keys.CONTROL).key_down(Keys.SHIFT).send_keys( - Keys.END).key_up(Keys.CONTROL).key_up(Keys.SHIFT).perform() - self.try_send_keys(start_prog, initial_program, try_counter) - try_counter += 1 + self.wait_element(term=term, scrap_type=enum.ScrapType.CSS_SELECTOR, main_container="body") + + element_value = '' + try_counter = 0 - if try_counter > 4: - try_counter = 0 + endtime = time.time() + self.config.time_out + while (time.time() < endtime and (element_value != user_value.strip())): - if (start_prog_value() != initial_program.strip()): - self.restart_counter += 1 - message = "Couldn't fill Program input element." - self.log_error(message) - raise ValueError(message) + soup = self.get_current_DOM() - env_element = next(iter(soup.select(input_environment)), None) - if env_element is None: + soup_element = next(iter(soup.select(term)), None) + if soup_element is None: self.restart_counter += 1 - message = "Couldn't find Environment input element." + message = f"Couldn't find '{term}' element." self.log_error(message) raise ValueError(message) - env = lambda: self.soup_to_selenium(env_element) - - if self.webapp_shadowroot(): - env_value = lambda: self.get_web_value(next(iter(self.find_shadow_element('input', env())), None)) - else: - env_value = lambda: self.get_web_value(env()) - - endtime = time.time() + self.config.time_out - try_counter = 0 - while (time.time() < endtime and (env_value().strip() != self.config.environment.strip())): - - logger().info(f'Filling Environment: "{self.config.environment}"') - - if try_counter == 0: - env = lambda: self.soup_to_selenium(env_element) - else: - env = lambda: self.soup_to_selenium(env_element.parent) - - self.set_element_focus(env()) - self.click(env()) - ActionChains(self.driver).key_down(Keys.CONTROL).send_keys(Keys.HOME).key_up(Keys.CONTROL).perform() - ActionChains(self.driver).key_down(Keys.CONTROL).key_down(Keys.SHIFT).send_keys( - Keys.END).key_up(Keys.CONTROL).key_up(Keys.SHIFT).perform() - self.send_keys(env(), self.config.environment) - try_counter += 1 if (try_counter < 1) else -1 + element = lambda: self.soup_to_selenium(soup_element) - if (env_value().strip() != self.config.environment.strip()): - self.restart_counter += 1 - message = "Couldn't fill Environment input element." - self.log_error(message) - raise ValueError(message) + self.set_element_focus(element()) + self.click(element()) + self.try_send_keys(element, user_value, try_counter) + try_counter += 1 + if try_counter > 4: + try_counter = 0 + if self.webapp_shadowroot(): - self.wait_until_to(expected_condition = "element_to_be_clickable", element=".startParameters", locator = By.CSS_SELECTOR) - parameters_screen = self.driver.find_element(By.CSS_SELECTOR, ".startParameters") - buttons = self.find_shadow_element('wa-button', parameters_screen) - button = next(iter(list(filter(lambda x: 'ok' in x.text.lower().strip(), buttons))), None) + element_value = self.get_web_value(next(iter(self.find_shadow_element('input', element())))).strip() if self.find_shadow_element('input', element()) else None else: - button = self.driver.find_element(By.CSS_SELECTOR, ".button-ok") + element_value = self.get_web_value(element()) - self.click(button) + if (element_value.strip() != user_value.strip()): + self.restart_counter += 1 + message = f"Couldn't fill '{term}' element." + self.log_error(message) + raise ValueError(message) def user_screen(self, admin_user=False): """ @@ -690,13 +684,15 @@ def reload_user_screen(self): logger().debug('Reloading user screen') + server_environment = self.config.environment + self.restart_browser() if self.config.coverage: self.driver.get(f"{self.config.url}/?StartProg=CASIGAADV&A={self.config.initial_program}&Env={self.config.environment}") if not self.config.skip_environment and not self.config.coverage: - self.program_screen(self.config.initial_program) + self.program_screen(self.config.initial_program, environment=server_environment) self.wait_element_timeout(term="[name='cGetUser']", scrap_type=enum.ScrapType.CSS_SELECTOR, timeout = self.config.time_out , main_container='body') @@ -1577,7 +1573,7 @@ def Program(self, program_name): >>> # Calling the method: >>> oHelper.Program("MATA020") """ - self.routine = 'Program' + self.config.routine_type = 'Program' self.config.routine = program_name if self.config.log_info_config: @@ -3233,6 +3229,8 @@ def restart(self): """ webdriver_exception = None + server_environment = self.config.environment + if self.config.appserver_service: try: self.sc_query(self.config.appserver_service) @@ -3261,7 +3259,7 @@ def restart(self): if self.config.initial_program != '' and self.restart_counter < 3: if not self.config.skip_environment and not self.config.coverage: - self.program_screen(self.config.initial_program) + self.program_screen(self.config.initial_program, environment=server_environment) self.wait_user_screen() if 'POUILogin' in self.config.json_data and self.config.json_data['POUILogin'] == True: @@ -3288,9 +3286,9 @@ def restart(self): if self.config.routine: - if self.routine == 'SetLateralMenu': + if self.config.routine_type == 'SetLateralMenu': self.SetLateralMenu(self.config.routine, save_input=False) - elif self.routine == 'Program': + elif self.config.routine_type == 'Program': self.set_program(self.config.routine) def wait_user_screen(self): @@ -3349,32 +3347,9 @@ def Finish(self): >>> oHelper.Finish() """ element = None - text_cover = None - string = self.language.codecoverage #"Aguarde... Coletando informacoes de cobertura de codigo." - timeout = 900 - optional_term = "wa-button" if self.webapp_shadowroot() else "button, .thbutton" if self.config.coverage: - endtime = time.time() + timeout - - while((time.time() < endtime) and (not element or not text_cover)): - - ActionChains(self.driver).key_down(Keys.CONTROL).perform() - ActionChains(self.driver).key_down('q').perform() - ActionChains(self.driver).key_up(Keys.CONTROL).perform() - - element = self.wait_element_timeout(term=self.language.finish, scrap_type=enum.ScrapType.MIXED, - optional_term=optional_term, timeout=5, step=1, main_container="body", check_error = False) - - if element: - self.SetButton(self.language.finish) - text_cover = self.WaitShow(string, timeout=10, throw_error=False) - if text_cover: - logger().debug(string) - logger().debug("Waiting for coverage to finish.") - self.WaitHide(string, throw_error=False) - logger().debug("Finished coverage.") - + self.get_coverage() else: endtime = time.time() + self.config.time_out while( time.time() < endtime and not element ): @@ -3389,6 +3364,52 @@ def Finish(self): if not element: logger().warning("Warning method finish use driver.refresh. element not found") + @count_time + def get_coverage(self): + + timeout = 1800 + optional_term = "wa-button" if self.webapp_shadowroot() else "button, .thbutton" + current_layers = 0 + coverage_finished = False + element = None + new_modal = False + coverage_exceed_timeout = False + + endtime = time.time() + timeout + + logger().debug("Startin coverage.") + + while not coverage_finished: + + if time.time() > endtime: + + if coverage_exceed_timeout: + logger().debug("Coverage exceed timeout.") + break + + logger().debug("Coverage exceed default timeout. adding more time.") + endtime = time.time() + timeout + coverage_exceed_timeout = True + + if not element: + ActionChains(self.driver).key_down(Keys.CONTROL).send_keys('q').key_up(Keys.CONTROL).perform() + element = self.wait_element_timeout(term=self.language.finish, scrap_type=enum.ScrapType.MIXED, + optional_term=optional_term, timeout=5, step=1, main_container="body", check_error=False) + + if element and not new_modal: + current_layers = self.check_layers('wa-dialog') + self.SetButton(self.language.finish) + new_modal = current_layers + 1 == self.check_layers('wa-dialog') + logger().debug("Waiting for coverage to finish.") + + if new_modal: + coverage_finished = current_layers >= self.check_layers('wa-dialog') + + if coverage_finished: + logger().debug("Coverage finished.") + + time.sleep(1) + def click_button_finish(self, click_counter=None): """ [internal] @@ -3424,35 +3445,9 @@ def LogOff(self): >>> oHelper.LogOff() """ element = None - text_cover = None - string = self.language.codecoverage #"Aguarde... Coletando informacoes de cobertura de codigo." - timeout = 900 - click_counter = 1 if self.config.coverage: - endtime = time.time() + timeout - - while((time.time() < endtime) and (not element or not text_cover)): - - ActionChains(self.driver).key_down(Keys.CONTROL).perform() - ActionChains(self.driver).key_down('q').perform() - ActionChains(self.driver).key_up(Keys.CONTROL).perform() - - element = self.wait_element_timeout(term=self.language.logOff, scrap_type=enum.ScrapType.MIXED, - optional_term=".tsay", timeout=5, step=1, main_container="body", check_error = False) - - if element: - if self.click_button_logoff(click_counter): - text_cover = self.search_text(selector=".tsay", text=string) - if text_cover: - logger().info(string) - timeout = endtime - time.time() - if timeout > 0: - self.wait_element_timeout(term=string, scrap_type=enum.ScrapType.MIXED, - optional_term=".tsay", timeout=timeout, step=0.1, main_container="body", check_error = False) - click_counter += 1 - if click_counter > 3: - click_counter = 1 + self.get_coverage() else: endtime = time.time() + self.config.time_out while( time.time() < endtime and not element ): @@ -3978,7 +3973,7 @@ def SetLateralMenu(self, menu_itens, save_input=True, click_menu_functional=Fals self.log_error_newlog() if save_input: - self.routine = 'SetLateralMenu' + self.config.routine_type = 'SetLateralMenu' self.config.routine = menu_itens if self.webapp_shadowroot(): @@ -4272,9 +4267,12 @@ def SetButton(self, button, sub_item="", position=1, check_error=True): try: restore_zoom = False soup_element = "" - if (button.lower() == "x"): + if (button.lower().strip() == "x"): self.set_button_x(position, check_error) return + elif (button.strip() == "?"): + self.set_button_character(term=button, position=position, check_error=check_error) + return else: self.wait_element_timeout(term=button, scrap_type=enum.ScrapType.MIXED, optional_term=term_button, timeout=10, step=0.1, check_error=check_error) position -= 1 @@ -4489,6 +4487,37 @@ def SetButton(self, button, sub_item="", position=1, check_error=True): if self.config.smart_test or self.config.debug_log: logger().debug(f"***System Info*** After Clicking on button:") system_info() + + def set_button_character(self, term, position=1, check_error=True): + """ + [Internal] + """ + + position -= 1 + button = None + + self.wait_element(term=term, position=position, check_error=check_error, main_container=self.containers_selectors["AllContainers"]) + + endtime = time.time() + self.config.time_out + + while time.time() < endtime and not button: + + container = self.get_current_container() + + buttons = container.select('button') + + buttons_displayed = list(filter(lambda x: self.element_is_displayed(x), buttons)) + + filtered_button = list(filter(lambda x: x.text.strip().lower() == term.strip().lower(), buttons_displayed)) + + if filtered_button and len(filtered_button) - 1 >= position: + button = filtered_button[position] + + element = self.soup_to_selenium(button) + + self.scroll_to_element(element) + self.wait_until_to(expected_condition="element_to_be_clickable", element=button, locator=By.XPATH) + self.click(element) def set_button_x(self, position=1, check_error=True): endtime = self.config.time_out/2 @@ -8973,7 +9002,6 @@ def click_tree(self, treepath, right_click, position, tree_number): element_click = lambda: self.soup_to_selenium(element_class_item) if last_item: - start_time = time.time() self.wait_blocker() if self.webapp_shadowroot(): element_is_closed = lambda: element.get_attribute('closed') == 'true' or element.get_attribute('closed') == '' @@ -8992,11 +9020,17 @@ def click_tree(self, treepath, right_click, position, tree_number): success = True if is_element_acessible() else False if success and right_click: - if self.webapp_shadowroot(): - self.click(element_click(), enum.ClickType.SELENIUM, - right_click) - else: - self.send_action(action=self.click, element=element_click, right_click=right_click) + last_zindex = self.return_last_zindex() + current_zindex = last_zindex + + endtime_right_click = time.time() + self.config.time_out / 3 + while time.time() < endtime_right_click and last_zindex <= current_zindex: + if self.webapp_shadowroot(): + self.click(element_click(), enum.ClickType.SELENIUM, + right_click) + current_zindex = self.return_last_zindex() + else: + self.send_action(action=self.click, element=element_click, right_click=right_click) else: self.scroll_to_element(element_click()) element_click().click() @@ -9441,16 +9475,13 @@ def TearDown(self): endtime = time.time() + self.config.time_out while (time.time() < endtime and ( not self.element_exists(term=term, scrap_type=enum.ScrapType.CSS_SELECTOR, main_container="body"))): - self.close_warning_screen() + self.close_screen_before_menu() self.Finish() elif not webdriver_exception: self.SetupTSS(self.config.initial_program, self.config.environment ) self.SetButton(self.language.exit) self.SetButton(self.language.yes) - if (self.search_text(selector=".tsay", text=string) and not webdriver_exception): - self.WaitProcessing(string, timeout) - if len(self.log.table_rows[1:]) > 0 and not self.log.has_csv_condition(): self.log.save_file() @@ -10001,14 +10032,26 @@ def ClickListBox(self, text): >>> oHelper.ClickListBox("text") """ - self.wait_element(term='.tlistbox', scrap_type=enum.ScrapType.CSS_SELECTOR, main_container=".tmodaldialog") + self.wait_element(term='.tlistbox, .dict-tlistbox', scrap_type=enum.ScrapType.CSS_SELECTOR, main_container=".tmodaldialog, wa-dialog") container = self.get_current_container() - tlist = container.select(".tlistbox") - list_option = next(iter(list(filter(lambda x: x.select('option'), tlist)))) + term = '.dict-tlistbox' if self.webapp_shadowroot() else '.tlistbox' + + tlist = container.select(term) + + if self.webapp_shadowroot(): + list_option = next(iter(list(map(lambda x: self.find_shadow_element('option', self.soup_to_selenium(x)), tlist)))) + else: + list_option = next(iter(list(filter(lambda x: x.select('option'), tlist)))) + list_option_filtered = list(filter(lambda x: self.element_is_displayed(x), list_option)) element = next(iter(filter(lambda x: x.text.strip() == text.strip(), list_option_filtered)), None) - element_selenium = self.soup_to_selenium(element) - self.wait_until_to(expected_condition="element_to_be_clickable", element = element, locator = By.XPATH ) + + if self.webapp_shadowroot(): + element_selenium = element + else: + element_selenium = self.soup_to_selenium(element) + self.wait_until_to(expected_condition="element_to_be_clickable", element = element, locator = By.XPATH ) + element_selenium.click() def ClickImage(self, img_name, double_click=False): @@ -11114,4 +11157,14 @@ def get_container_selector(self, selector): container = self.get_current_container() - return container.select(selector) \ No newline at end of file + return container.select(selector) + + def query_execute(self, query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password): + """ + Execute a query in a database + """ + base_database = BaseDatabase() + try: + return base_database.query_execute(query, database_driver, dbq_oracle_server, database_server, database_port, database_name, database_user, database_password) + except Exception as e: + self.log_error(f"Error in query_execute: {str(e)}") \ No newline at end of file diff --git a/tir/version.py b/tir/version.py index 905bd103..679650fa 100644 --- a/tir/version.py +++ b/tir/version.py @@ -1 +1 @@ -__version__ = '1.20.27' +__version__ = '1.20.28'