From 1d619a2dfe1486f5ad326dc22d843f840018ae94 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 26 Dec 2024 19:20:46 +0530 Subject: [PATCH 1/5] Add support for shell import scripts in automation --- .../executor/app/executor/import_executor.py | 55 +++++++++++++++---- .../executor/schedule_update_import.py | 6 +- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 0b7b9bf011..ce8a4a5665 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -333,6 +333,7 @@ def _import_one_helper( Args: See _import_one. """ + import_name = import_spec['import_name'] urls = import_spec.get('data_download_url') if urls: for url in urls: @@ -370,20 +371,23 @@ def _import_one_helper( ) else: # Run import script locally. + script_interpreter = _get_script_interpreter( + script_path, interpreter_path) process = _run_user_script( - interpreter_path=interpreter_path, + interpreter_path=script_interpreter, script_path=script_path, timeout=self.config.user_script_timeout, args=self.config.user_script_args, cwd=absolute_import_dir, env=self.config.user_script_env, + name=import_name, ) _log_process(process=process) process.check_returncode() inputs = self._upload_import_inputs( import_dir=absolute_import_dir, - output_dir=f'{relative_import_dir}/{import_spec["import_name"]}', + output_dir=f'{relative_import_dir}/{import_name}', version=version, import_spec=import_spec, ) @@ -527,7 +531,8 @@ def run_and_handle_exception( def _run_with_timeout_async(args: List[str], timeout: float, cwd: str = None, - env: dict = None) -> subprocess.CompletedProcess: + env: dict = None, + name: str = None) -> subprocess.CompletedProcess: """Runs a command in a subprocess asynchronously and emits the stdout/stderr. Args: @@ -543,8 +548,8 @@ def _run_with_timeout_async(args: List[str], """ try: logging.info( - f'Launching async command: {args} with timeout {timeout} in {cwd}, env:' - f' {env}') + f'Launching async command for {name}: {args} ' + f'with timeout {timeout} in {cwd}, env: {env}') start_time = time.time() stdout = [] stderr = [] @@ -559,17 +564,20 @@ def _run_with_timeout_async(args: List[str], # Log output continuously until the command completes. for line in process.stderr: stderr.append(line) - logging.info(f'Process stderr: {line}') + logging.info(f'Process stderr:{name}: {line}') for line in process.stdout: stdout.append(line) - logging.info(f'Process stdout: {line}') + logging.info(f'Process stdout:{name}: {line}') + # Wait in case script has closed stderr/stdout early. + process.wait() end_time = time.time() return_code = process.returncode end_msg = ( - f'Completed script: "{args}", Return code: {return_code}, time:' - f' {end_time - start_time:.3f} secs.\n') + f'Completed script:{name}: "{args}", ' + f'Return code: {return_code}, ' + f'time: {end_time - start_time:.3f} secs.\n') logging.info(end_msg) return subprocess.CompletedProcess( args=args, @@ -580,8 +588,8 @@ def _run_with_timeout_async(args: List[str], except Exception as e: message = traceback.format_exc() logging.exception( - f'An unexpected exception was thrown: {e} when running {args}:' - f' {message}') + f'An unexpected exception was thrown: {e} when running {name}:' + f'{args}: {message}') return subprocess.CompletedProcess( args=args, returncode=1, @@ -668,6 +676,27 @@ def _create_venv(requirements_path: Iterable[str], venv_dir: str, return os.path.join(venv_dir, 'bin/python3'), process +def _get_script_interpreter(script: str, py_interpreter: str) -> str: + """Returns the interpreter for the script. + + Args: + script: user script to be executed + py_interpreter: Path to python within virtual environment + + Returns: + interpreter for user script, such as python for .py, bash for .sh + """ + if not script: + return py_interpreter + + cmd = script.split(' ')[0] + if cmd.endswith('.sh'): + # Use bash for shell scripts. + return 'bash' + + return py_interpreter + + def _run_user_script( interpreter_path: str, script_path: str, @@ -675,6 +704,7 @@ def _run_user_script( args: list = None, cwd: str = None, env: dict = None, + name: str = None, ) -> subprocess.CompletedProcess: """Runs a user Python script. @@ -687,6 +717,7 @@ def _run_user_script( the command line. cwd: Current working directory of the process as a string. env: Dict of environment variables for the user script run. + name: Name of the script. Returns: subprocess.CompletedProcess object used to run the script. @@ -699,7 +730,7 @@ def _run_user_script( script_args.extend(script_path.split(' ')) if args: script_args.extend(args) - return _run_with_timeout_async(script_args, timeout, cwd, env) + return _run_with_timeout_async(script_args, timeout, cwd, env, name) def _clean_time( diff --git a/import-automation/executor/schedule_update_import.py b/import-automation/executor/schedule_update_import.py index ee90e053e8..1545e94860 100644 --- a/import-automation/executor/schedule_update_import.py +++ b/import-automation/executor/schedule_update_import.py @@ -54,6 +54,8 @@ 'A string specifying the path of an import in the following format:' ':.' 'Example: scripts/us_usda/quickstats:UsdaAgSurvey') +flags.DEFINE_string('config_override', _CONFIG_OVERRIDE_FILE, + 'Config file with overridden parameters.') _FLAGS(sys.argv) @@ -301,8 +303,8 @@ def main(_): cfg.gcp_project_id = _FLAGS.gke_project_id logging.info( - f'Updating any config fields from local file: {_CONFIG_OVERRIDE_FILE}.') - cfg = _override_configs(_CONFIG_OVERRIDE_FILE, cfg) + f'Updating any config fields from local file: {_FLAGS.config_override}.') + cfg = _override_configs(_FLAGS.config_override, cfg) logging.info('Reading Cloud scheduler configs from GCS.') scheduler_config_dict = _get_cloud_config(_FLAGS.scheduler_config_filename) From 93abafe120c56c6c8d56f6c5010e0d8c86e7507a Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 26 Dec 2024 19:55:49 +0530 Subject: [PATCH 2/5] support scripts without extension --- .../executor/app/executor/import_executor.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index ce8a4a5665..8f088ccff3 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -687,12 +687,17 @@ def _get_script_interpreter(script: str, py_interpreter: str) -> str: interpreter for user script, such as python for .py, bash for .sh """ if not script: - return py_interpreter - - cmd = script.split(' ')[0] - if cmd.endswith('.sh'): - # Use bash for shell scripts. - return 'bash' + return None + + base, ext = os.path.splitext(script.split(' ')[0]) + match ext: + case '.py': + return py_interpreter + case '.sh': + return 'bash' + case _: + logging.info(f'Unknown extension for script: {script}.') + return None return py_interpreter @@ -726,7 +731,9 @@ def _run_user_script( subprocess.TimeoutExpired: The user script did not finish within timeout. """ - script_args = [interpreter_path] + script_args = [] + if interpreter_path: + script_args.append(interpreter_path) script_args.extend(script_path.split(' ')) if args: script_args.extend(args) From 53b051ba23a58e3bdc12efeee233b43552e5ea34 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 26 Dec 2024 19:56:55 +0530 Subject: [PATCH 3/5] lint --- .../executor/app/executor/import_executor.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 8f088ccff3..92b0d57013 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -547,9 +547,8 @@ def _run_with_timeout_async(args: List[str], Same exceptions as subprocess.run. """ try: - logging.info( - f'Launching async command for {name}: {args} ' - f'with timeout {timeout} in {cwd}, env: {env}') + logging.info(f'Launching async command for {name}: {args} ' + f'with timeout {timeout} in {cwd}, env: {env}') start_time = time.time() stdout = [] stderr = [] @@ -574,10 +573,9 @@ def _run_with_timeout_async(args: List[str], end_time = time.time() return_code = process.returncode - end_msg = ( - f'Completed script:{name}: "{args}", ' - f'Return code: {return_code}, ' - f'time: {end_time - start_time:.3f} secs.\n') + end_msg = (f'Completed script:{name}: "{args}", ' + f'Return code: {return_code}, ' + f'time: {end_time - start_time:.3f} secs.\n') logging.info(end_msg) return subprocess.CompletedProcess( args=args, @@ -733,7 +731,7 @@ def _run_user_script( """ script_args = [] if interpreter_path: - script_args.append(interpreter_path) + script_args.append(interpreter_path) script_args.extend(script_path.split(' ')) if args: script_args.extend(args) From ce0f1c6415ce25f6dadcb2dd4f7677766c645d22 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 26 Dec 2024 19:58:32 +0530 Subject: [PATCH 4/5] comment --- import-automation/executor/app/executor/import_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index 92b0d57013..52eb2e1f48 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -683,6 +683,7 @@ def _get_script_interpreter(script: str, py_interpreter: str) -> str: Returns: interpreter for user script, such as python for .py, bash for .sh + Returns None if the script has no extension. """ if not script: return None From da11a7f5030361d8a62ce15260ee4121731ac88e Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 26 Dec 2024 20:13:54 +0530 Subject: [PATCH 5/5] comment --- import-automation/executor/schedule_update_import.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/import-automation/executor/schedule_update_import.py b/import-automation/executor/schedule_update_import.py index 1545e94860..b815faca52 100644 --- a/import-automation/executor/schedule_update_import.py +++ b/import-automation/executor/schedule_update_import.py @@ -303,7 +303,8 @@ def main(_): cfg.gcp_project_id = _FLAGS.gke_project_id logging.info( - f'Updating any config fields from local file: {_FLAGS.config_override}.') + f'Updating any config fields from local file: {_FLAGS.config_override}.' + ) cfg = _override_configs(_FLAGS.config_override, cfg) logging.info('Reading Cloud scheduler configs from GCS.')