From 2c3bdb2f57c6b7aa7905f2154f23ba10585b2861 Mon Sep 17 00:00:00 2001 From: Ajai Date: Wed, 6 Dec 2023 07:49:21 +0000 Subject: [PATCH 1/2] Set command line default to small import temporarily (#937) * set defaults to single year, limited counties to temporarily have script finish in <1 for autorefresh test --- scripts/us_usda/quickstats/process.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/scripts/us_usda/quickstats/process.py b/scripts/us_usda/quickstats/process.py index 90768e668c..8520d0677c 100644 --- a/scripts/us_usda/quickstats/process.py +++ b/scripts/us_usda/quickstats/process.py @@ -64,14 +64,17 @@ flags.DEFINE_string(_USDA_API_KEY, None, 'USDA quickstats API key.') flags.DEFINE_integer( 'start_year', - os.getenv('start_year', 2000), - 'Year from whihc data is processed.', + os.getenv('start_year', + datetime.now().year), + 'Year from which data is processed.', ) flags.DEFINE_integer( 'num_counties', - os.getenv('num_counties', 5000), + os.getenv('num_counties', 100), 'number of counties for which data is processed.', ) +flags.DEFINE_string('output_dir', 'output', + 'Output firectory for generated files.') def process_survey_data(year, svs, out_dir): @@ -295,7 +298,7 @@ def get_multiple_years(): start = datetime.now() print('Start', start) - out_dir = 'output' + out_dir = _FLAGS.output_dir svs = load_svs() years = range(_FLAGS.start_year, datetime.now().year + 1) for year in years: From 355d43d12218a14fc7ee9de4da76abced71535d3 Mon Sep 17 00:00:00 2001 From: Ajai Date: Wed, 6 Dec 2023 13:50:39 +0000 Subject: [PATCH 2/2] additional options for import-automation (#943) * add options for user script args and env * lint fix * add cwd to user script launcher --- import-automation/executor/app/configs.py | 14 +- .../executor/app/executor/import_executor.py | 823 +++++++++++------- import-automation/executor/local_executor.py | 5 +- 3 files changed, 498 insertions(+), 344 deletions(-) diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index b1d37a537e..14d6e10bb9 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -11,16 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" -Configurations for the executor. +"""Configurations for the executor. The app endpoints accept a configs field that allows customization of all the configurations. See main.py. """ +import dataclasses import os from typing import List -import dataclasses from google.cloud import logging @@ -32,6 +31,7 @@ def _production(): @dataclasses.dataclass class ExecutorConfig: """Configurations for the executor.""" + # ID of the Google Cloud project that hosts the executor. The project # needs to enable App Engine and Cloud Scheduler. gcp_project_id: str = 'google.com:datcom-data' @@ -102,9 +102,13 @@ class ExecutorConfig: # ID of the location where Cloud Scheduler is hosted. scheduler_location: str = 'us-central1' # Maximum time a user script can run for in seconds. - user_script_timeout: float = 600 + user_script_timeout: float = 3600 + # Arguments for the user script + user_script_args: List[str] = () + # Environment variables for the user script + user_script_env: dict = None # Maximum time venv creation can take in seconds. - venv_create_timeout: float = 600 + venv_create_timeout: float = 3600 # Maximum time downloading a file can take in seconds. file_download_timeout: float = 600 # Maximum time downloading the repo can take in seconds. diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index b8e5e7ea97..3b20271467 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -11,31 +11,32 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" -Import executor that downloads GitHub repositories and executes data imports +"""Import executor that downloads GitHub repositories and executes data imports + based on manifests. """ +import dataclasses import json +import logging import os import subprocess import tempfile -import logging +import time import traceback -from typing import Tuple, List, Dict, Optional, Callable, Iterable -import dataclasses +from typing import Callable, Dict, Iterable, List, Optional, Tuple -from app import utils from app import configs -from app.service import dashboard_api +from app import utils from app.executor import import_target -from app.service import github_api -from app.service import file_uploader +from app.service import dashboard_api from app.service import email_notifier +from app.service import file_uploader +from app.service import github_api from app.service import import_service -_SYSTEM_RUN_INIT_FAILED_MESSAGE = ('Failed to initialize the system run ' - 'with the import progress dashboard') +_SYSTEM_RUN_INIT_FAILED_MESSAGE = ( + 'Failed to initialize the system run with the import progress dashboard') _SEE_DASHBOARD_MESSAGE = ( 'See dashboard for logs: ' @@ -45,6 +46,7 @@ @dataclasses.dataclass class ExecutionResult: """Describes the result of the execution of an import.""" + # Status of the execution, one of 'succeeded', 'failed', or 'pass' status: str # Absolute import names of the imports executed @@ -55,12 +57,12 @@ class ExecutionResult: class ExecutionError(Exception): """Exception to signal that an error has occurred during the execution - of an import. - Attributes: - result: ExecutionResult object describing the result - of the execution. - """ + of an import. + + Attributes: + result: ExecutionResult object describing the result of the execution. + """ def __init__(self, execution_result: ExecutionResult): super().__init__() @@ -69,29 +71,31 @@ def __init__(self, execution_result: ExecutionResult): class ImportExecutor: """Import executor that downloads GitHub repositories and executes - data imports based on manifests. - - Attributes: - uploader: FileUploader object for uploading the generated data files - to some place. - github: GitHubRepoAPI object for communicating with GitHUB API. - config: ExecutorConfig object containing configurations - for the execution. - dashboard: DashboardAPI object for communicating with the - import progress dashboard. If not provided, the executor will not - communicate with the dashboard. - notifier: EmailNotifier object for sending notificaiton emails. - importer: ImportServiceClient object for invoking the - Data Commons importer. - """ - def __init__(self, - uploader: file_uploader.FileUploader, - github: github_api.GitHubRepoAPI, - config: configs.ExecutorConfig, - dashboard: dashboard_api.DashboardAPI = None, - notifier: email_notifier.EmailNotifier = None, - importer: 'import_service.ImportServiceClient' = None): + data imports based on manifests. + + Attributes: + uploader: FileUploader object for uploading the generated data files to + some place. + github: GitHubRepoAPI object for communicating with GitHUB API. + config: ExecutorConfig object containing configurations for the execution. + dashboard: DashboardAPI object for communicating with the import progress + dashboard. If not provided, the executor will not communicate with the + dashboard. + notifier: EmailNotifier object for sending notificaiton emails. + importer: ImportServiceClient object for invoking the Data Commons + importer. + """ + + def __init__( + self, + uploader: file_uploader.FileUploader, + github: github_api.GitHubRepoAPI, + config: configs.ExecutorConfig, + dashboard: dashboard_api.DashboardAPI = None, + notifier: email_notifier.EmailNotifier = None, + importer: 'import_service.ImportServiceClient' = None, + ): self.uploader = uploader self.github = github self.config = config @@ -99,55 +103,63 @@ def __init__(self, self.notifier = notifier self.importer = importer - def execute_imports_on_commit(self, - commit_sha: str, - repo_name: str = None, - branch_name: str = None, - pr_number: str = None) -> ExecutionResult: + def execute_imports_on_commit( + self, + commit_sha: str, + repo_name: str = None, + branch_name: str = None, + pr_number: str = None, + ) -> ExecutionResult: """Executes imports upon a GitHub commit. - repo_name, branch_name, and pr_number are used only for logging to - the import progress dashboard. - - Args: - commit_sha: ID of the commit as a string. - repo_name: Name of the repository the commit is for as a string. - branch_name: Name of the branch the commit is for as a string. - pr_number: If the commit is a part of a pull request, the number of - the pull request as an int. - Returns: - ExecutionResult object describing the results of the imports. - """ + repo_name, branch_name, and pr_number are used only for logging to + the import progress dashboard. + + Args: + commit_sha: ID of the commit as a string. + repo_name: Name of the repository the commit is for as a string. + branch_name: Name of the branch the commit is for as a string. + pr_number: If the commit is a part of a pull request, the number of the + pull request as an int. + + Returns: + ExecutionResult object describing the results of the imports. + """ run_id = None try: if self.dashboard: - run_id = _init_run_helper(dashboard=self.dashboard, - commit_sha=commit_sha, - repo_name=repo_name, - branch_name=branch_name, - pr_number=pr_number)['run_id'] + run_id = _init_run_helper( + dashboard=self.dashboard, + commit_sha=commit_sha, + repo_name=repo_name, + branch_name=branch_name, + pr_number=pr_number, + )['run_id'] except Exception: logging.exception(_SYSTEM_RUN_INIT_FAILED_MESSAGE) return _create_system_run_init_failed_result(traceback.format_exc()) - return run_and_handle_exception(run_id, self.dashboard, - self._execute_imports_on_commit_helper, - commit_sha, run_id) + return run_and_handle_exception( + run_id, + self.dashboard, + self._execute_imports_on_commit_helper, + commit_sha, + run_id, + ) def execute_imports_on_update(self, absolute_import_name: str) -> ExecutionResult: """Executes imports upon a scheduled update. - Args: - absolute_import_name: Absolute import name of the imports to - execute of the form : - as a string. E.g., scripts/us_fed/treasury:USFed_MaturityRates. - can be 'all' to execute all imports within - the directory. + Args: + absolute_import_name: Absolute import name of the imports to execute of + the form : as a string. E.g., + scripts/us_fed/treasury:USFed_MaturityRates. can be + 'all' to execute all imports within the directory. - Returns: - ExecutionResult object describing the results of the imports. - """ + Returns: + ExecutionResult object describing the results of the imports. + """ run_id = None try: if self.dashboard: @@ -156,9 +168,13 @@ def execute_imports_on_update(self, logging.exception(_SYSTEM_RUN_INIT_FAILED_MESSAGE) return _create_system_run_init_failed_result(traceback.format_exc()) - return run_and_handle_exception(run_id, self.dashboard, - self._execute_imports_on_update_helper, - absolute_import_name, run_id) + return run_and_handle_exception( + run_id, + self.dashboard, + self._execute_imports_on_update_helper, + absolute_import_name, + run_id, + ) def _execute_imports_on_update_helper( self, @@ -166,17 +182,17 @@ def _execute_imports_on_update_helper( run_id: str = None) -> ExecutionResult: """Helper for execute_imports_on_update. - Args: - absolute_import_name: See execute_imports_on_update. - run_id: ID of the system run as a string. This is only used to - communicate with the import progress dashboard. + Args: + absolute_import_name: See execute_imports_on_update. + run_id: ID of the system run as a string. This is only used to + communicate with the import progress dashboard. - Returns: - ExecutionResult object describing the results of the executions. + Returns: + ExecutionResult object describing the results of the executions. - Raises: - ExecutionError: The execution of an import failed for any reason. - """ + Raises: + ExecutionError: The execution of an import failed for any reason. + """ logging.info('%s: BEGIN', absolute_import_name) with tempfile.TemporaryDirectory() as tmpdir: logging.info('%s: downloading repo', absolute_import_name) @@ -209,7 +225,8 @@ def _execute_imports_on_update_helper( relative_import_dir=import_dir, absolute_import_dir=absolute_import_dir, import_spec=spec, - run_id=run_id) + run_id=run_id, + ) except Exception: raise ExecutionError( ExecutionResult('failed', executed_imports, @@ -227,17 +244,16 @@ def _execute_imports_on_commit_helper(self, ) -> ExecutionResult: """Helper for execute_imports_on_commit. - Args: - See execute_imports_on_commit. - run_id: ID of the system run as a string. This is only used to - communicate with the import progress dashboard. + Args: See execute_imports_on_commit. + run_id: ID of the system run as a string. This is only used to + communicate with the import progress dashboard. - Returns: - ExecutionResult object describing the results of the executions. + Returns: + ExecutionResult object describing the results of the executions. - Raises: - ExecutionError: The execution of an import failed for any reason. - """ + Raises: + ExecutionError: The execution of an import failed for any reason. + """ # Import targets specified in the commit message, # e.g., 'scripts/us_fed/treasury:constant_maturity', 'constant_maturity' @@ -262,17 +278,20 @@ def _execute_imports_on_commit_helper(self, targets=targets, manifest_dirs=manifest_dirs, manifest_filename=self.config.manifest_filename, - repo_dir=repo_dir) + repo_dir=repo_dir, + ) executed_imports = [] for relative_dir, spec in imports_to_execute: try: - self._import_one(repo_dir=repo_dir, - relative_import_dir=relative_dir, - absolute_import_dir=os.path.join( - repo_dir, relative_dir), - import_spec=spec, - run_id=run_id) + self._import_one( + repo_dir=repo_dir, + relative_import_dir=relative_dir, + absolute_import_dir=os.path.join( + repo_dir, relative_dir), + import_spec=spec, + run_id=run_id, + ) except Exception: raise ExecutionError( @@ -291,24 +310,26 @@ def _execute_imports_on_commit_helper(self, return ExecutionResult('succeeded', executed_imports, 'No issues') - def _import_one(self, - repo_dir: str, - relative_import_dir: str, - absolute_import_dir: str, - import_spec: dict, - run_id: str = None) -> None: + def _import_one( + self, + repo_dir: str, + relative_import_dir: str, + absolute_import_dir: str, + import_spec: dict, + run_id: str = None, + ) -> None: """Executes an import. - Args: - repo_dir: Absolute path to the repository, as a string. - relative_import_dir: Path to the directory containing the manifest - as a string, relative to the root directory of the repository. - absolute_import_dir: Absolute path to the directory containing - the manifest as a string. - import_spec: Specification of the import as a dict. - run_id: ID of the system run that executes the import. This is only - used to communicate with the import progress dashboard. - """ + Args: + repo_dir: Absolute path to the repository, as a string. + relative_import_dir: Path to the directory containing the manifest as a + string, relative to the root directory of the repository. + absolute_import_dir: Absolute path to the directory containing the + manifest as a string. + import_spec: Specification of the import as a dict. + run_id: ID of the system run that executes the import. This is only used + to communicate with the import progress dashboard. + """ import_name = import_spec['import_name'] absolute_import_name = import_target.get_absolute_import_name( relative_import_dir, import_name) @@ -321,50 +342,58 @@ def _import_one(self, import_name=import_name, absolute_import_name=absolute_import_name, provenance_url=import_spec['provenance_url'], - provenance_description=import_spec['provenance_description']) + provenance_description=import_spec['provenance_description'], + ) attempt_id = attempt['attempt_id'] try: - self._import_one_helper(repo_dir=repo_dir, - relative_import_dir=relative_import_dir, - absolute_import_dir=absolute_import_dir, - import_spec=import_spec, - run_id=run_id, - attempt_id=attempt_id) + self._import_one_helper( + repo_dir=repo_dir, + relative_import_dir=relative_import_dir, + absolute_import_dir=absolute_import_dir, + import_spec=import_spec, + run_id=run_id, + attempt_id=attempt_id, + ) if self.notifier: self.notifier.send( - subject=(f'Import Automation - {absolute_import_name} ' - f'- Succeeded'), + subject= + f'Import Automation - {absolute_import_name} - Succeeded', body=_SEE_DASHBOARD_MESSAGE, - receiver_addresses=curator_emails) + receiver_addresses=curator_emails, + ) except Exception as exc: if self.dashboard: - _mark_import_attempt_failed(attempt_id=attempt_id, - message=traceback.format_exc(), - dashboard=self.dashboard) + _mark_import_attempt_failed( + attempt_id=attempt_id, + message=traceback.format_exc(), + dashboard=self.dashboard, + ) if self.notifier: self.notifier.send( - subject=(f'Import Automation - {absolute_import_name} ' - f'- Failed'), + subject= + f'Import Automation - {absolute_import_name} - Failed', body=_SEE_DASHBOARD_MESSAGE, - receiver_addresses=curator_emails) + receiver_addresses=curator_emails, + ) raise exc - def _import_one_helper(self, - repo_dir: str, - relative_import_dir: str, - absolute_import_dir: str, - import_spec: dict, - run_id: str = None, - attempt_id: str = None) -> None: + def _import_one_helper( + self, + repo_dir: str, + relative_import_dir: str, + absolute_import_dir: str, + import_spec: dict, + run_id: str = None, + attempt_id: str = None, + ) -> None: """Helper for _import_one. - Args: - See _import_one. - attempt_id: ID of the import attempt executed by the system run - with the run_id, as a string. This is only used to communicate - with the import progress dashboard. - """ + Args: See _import_one. + attempt_id: ID of the import attempt executed by the system run with the + run_id, as a string. This is only used to communicate with the import + progress dashboard. + """ urls = import_spec.get('data_download_url') if urls: for url in urls: @@ -383,12 +412,15 @@ def _import_one_helper(self, interpreter_path, process = _create_venv( (central_requirements_path, requirements_path), tmpdir, - timeout=self.config.venv_create_timeout) + timeout=self.config.venv_create_timeout, + ) - _log_process(process=process, - dashboard=self.dashboard, - attempt_id=attempt_id, - run_id=run_id) + _log_process( + process=process, + dashboard=self.dashboard, + attempt_id=attempt_id, + run_id=run_id, + ) process.check_returncode() script_paths = import_spec.get('scripts') @@ -397,18 +429,24 @@ def _import_one_helper(self, interpreter_path=interpreter_path, script_path=os.path.join(absolute_import_dir, path), timeout=self.config.user_script_timeout, - cwd=absolute_import_dir) - _log_process(process=process, - dashboard=self.dashboard, - attempt_id=attempt_id, - run_id=run_id) + args=self.config.user_script_args, + cwd=absolute_import_dir, + env=self.config.user_script_env, + ) + _log_process( + process=process, + dashboard=self.dashboard, + attempt_id=attempt_id, + run_id=run_id, + ) process.check_returncode() inputs = self._upload_import_inputs( import_dir=absolute_import_dir, output_dir=f'{relative_import_dir}/{import_spec["import_name"]}', import_inputs=import_spec.get('import_inputs', []), - attempt_id=attempt_id) + attempt_id=attempt_id, + ) if self.importer: self.importer.delete_previous_output(relative_import_dir, @@ -418,13 +456,15 @@ def _import_one_helper(self, self.dashboard.info( f'Submitting job to delete the previous import', attempt_id=attempt_id, - run_id=run_id) + run_id=run_id, + ) try: self.importer.delete_import( relative_import_dir, import_spec, block=True, - timeout=self.config.importer_delete_timeout) + timeout=self.config.importer_delete_timeout, + ) except import_service.ImportNotFoundError as exc: # If this is the first time executing this import, # there will be no previous import @@ -433,15 +473,18 @@ def _import_one_helper(self, self.dashboard.info(f'Deleted previous import', attempt_id=attempt_id, run_id=run_id) - self.dashboard.info(f'Submitting job to perform the import', - attempt_id=attempt_id, - run_id=run_id) + self.dashboard.info( + f'Submitting job to perform the import', + attempt_id=attempt_id, + run_id=run_id, + ) self.importer.smart_import( relative_import_dir, inputs, import_spec, block=True, - timeout=self.config.importer_import_timeout) + timeout=self.config.importer_import_timeout, + ) if self.dashboard: self.dashboard.info(f'Import succeeded', attempt_id=attempt_id, @@ -455,31 +498,32 @@ def _import_one_helper(self, }, attempt_id) def _upload_import_inputs( - self, - import_dir: str, - output_dir: str, - import_inputs: List[Dict[str, str]], - attempt_id: str = None) -> 'import_service.ImportInputs': + self, + import_dir: str, + output_dir: str, + import_inputs: List[Dict[str, str]], + attempt_id: str = None, + ) -> 'import_service.ImportInputs': """Uploads the generated import data files. - Data files are uploaded to //, where is a - time string and is written to / - after the uploads are complete. - - Args: - import_dir: Absolute path to the directory with the manifest, - as a string. - output_dir: Path to the output directory, as a string. - import_inputs: List of import inputs each as a dict mapping - import types to relative paths within the repository. This is - parsed from the 'import_inputs' field in the manifest. - attempt_id: ID of the import attempt executed by the system run - with the run_id, as a string. This is only used to communicate - with the import progress dashboard. - - Returns: - ImportInputs object containing the paths to the uploaded inputs. - """ + Data files are uploaded to //, where is a + time string and is written to / + after the uploads are complete. + + Args: + import_dir: Absolute path to the directory with the manifest, as a + string. + output_dir: Path to the output directory, as a string. + import_inputs: List of import inputs each as a dict mapping import types + to relative paths within the repository. This is parsed from the + 'import_inputs' field in the manifest. + attempt_id: ID of the import attempt executed by the system run with the + run_id, as a string. This is only used to communicate with the import + progress dashboard. + + Returns: + ImportInputs object containing the paths to the uploaded inputs. + """ uploaded = import_service.ImportInputs() version = _clean_time(utils.pacific_time()) for import_input in import_inputs: @@ -487,9 +531,11 @@ def _upload_import_inputs( path = import_input.get(input_type) if path: dest = f'{output_dir}/{version}/{os.path.basename(path)}' - self._upload_file_helper(src=os.path.join(import_dir, path), - dest=dest, - attempt_id=attempt_id) + self._upload_file_helper( + src=os.path.join(import_dir, path), + dest=dest, + attempt_id=attempt_id, + ) setattr(uploaded, input_type, dest) self.uploader.upload_string( version, @@ -502,13 +548,13 @@ def _upload_file_helper(self, attempt_id: str = None) -> None: """Uploads a file from src to dest. - Args: - src: Path to the file to upload, as a string. - dest: Path to where the file is to be uploaded to, as a string. - attempt_id: ID of the import attempt executed by the system run - with the run_id, as a string. This is only used to communicate - with the import progress dashboard. - """ + Args: + src: Path to the file to upload, as a string. + dest: Path to where the file is to be uploaded to, as a string. + attempt_id: ID of the import attempt executed by the system run with the + run_id, as a string. This is only used to communicate with the import + progress dashboard. + """ if self.dashboard: with open(src) as file: self.dashboard.info( @@ -520,37 +566,40 @@ def _upload_file_helper(self, def parse_manifest(path: str) -> dict: """Parses the import manifest. - Args: - path: Path to the import manifest file as a string. + Args: + path: Path to the import manifest file as a string. - Returns: - The parsed manifest as a dict. + Returns: + The parsed manifest as a dict. - Raises: - Same exceptions as open and json.load if the file does not exist or - contains malformed json. - """ + Raises: + Same exceptions as open and json.load if the file does not exist or + contains malformed json. + """ with open(path) as file: return json.load(file) -def run_and_handle_exception(run_id: Optional[str], - dashboard: Optional[dashboard_api.DashboardAPI], - exec_func: Callable, *args) -> ExecutionResult: +def run_and_handle_exception( + run_id: Optional[str], + dashboard: Optional[dashboard_api.DashboardAPI], + exec_func: Callable, + *args, +) -> ExecutionResult: """Runs a method that executes imports and handles its exceptions. - run_id and dashboard are for logging to the import progress dashboard. - They can be None to not perform such logging. + run_id and dashboard are for logging to the import progress dashboard. + They can be None to not perform such logging. - Args: - run_id: ID of the system run as a string. - dashboard: DashboardAPI for logging to the import progress dashboard. - exec_func: The method to execute. - args: List of arguments sent to exec_func. + Args: + run_id: ID of the system run as a string. + dashboard: DashboardAPI for logging to the import progress dashboard. + exec_func: The method to execute. + args: List of arguments sent to exec_func. - Returns: - ExecutionResult object describing the results of the imports. - """ + Returns: + ExecutionResult object describing the results of the imports. + """ try: return exec_func(*args) except ExecutionError as exc: @@ -567,101 +616,191 @@ def run_and_handle_exception(run_id: Optional[str], return ExecutionResult('failed', [], message) +def _run_with_timeout_async(args: List[str], + timeout: float, + cwd: str = None, + env: dict = None) -> subprocess.CompletedProcess: + """Runs a command in a subprocess asynchronously and emits the stdout/stderr. + + Args: + args: Command to run as a list. Each element is a string. + timeout: Maximum time the command can run for in seconds as a float. + cwd: Current working directory of the process as a string. + + Returns: + subprocess.CompletedProcess object used to run the command. + + Raises: + Same exceptions as subprocess.run. + """ + try: + logging.info( + f'Launching async command: {args} with timeout {timeout} in {cwd}, env: {env}' + ) + start_time = time.time() + stdout = [] + stderr = [] + process = subprocess.Popen( + args, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + + # Log output continuously until the command completes. + for line in process.stdout: + stdout.append(line) + logging.info(f'Process stdout: {line}') + for line in process.stderr: + stderr.append(line) + logging.info(f'Process stderr: {line}') + + end_time = time.time() + + return_code = process.returncode + end_msg = ( + f'Completed script: "{args}", Return code: {return_code}, time:' + f' {end_time - start_time:.3f} secs.\n') + logging.info(end_msg) + return subprocess.CompletedProcess( + args=args, + returncode=return_code, + stdout=b''.join(stdout), + stderr=b''.join(stderr), + ) + except Exception as e: + message = traceback.format_exc() + logging.exception( + f'An unexpected exception was thrown: {e} when running {args}:' + f' {message}') + return subprocess.CompletedProcess( + args=args, + returncode=1, + stdout=b''.join(stdout), + stderr=b''.join(stderr), + ) + + def _run_with_timeout(args: List[str], timeout: float, - cwd: str = None) -> subprocess.CompletedProcess: + cwd: str = None, + env: dict = None) -> subprocess.CompletedProcess: """Runs a command in a subprocess. - Args: - args: Command to run as a list. Each element is a string. - timeout: Maximum time the command can run for in seconds as a float. - cwd: Current working directory of the process as a string. + Args: + args: Command to run as a list. Each element is a string. + timeout: Maximum time the command can run for in seconds as a float. + cwd: Current working directory of the process as a string. + env: Dict of environment variables for the command. - Returns: - subprocess.CompletedProcess object used to run the command. + Returns: + subprocess.CompletedProcess object used to run the command. - Raises: - Same exceptions as subprocess.run. - """ - return subprocess.run(args, - capture_output=True, - text=True, - timeout=timeout, - cwd=cwd) + Raises: + Same exceptions as subprocess.run. + """ + try: + logging.info( + f'Running command: {args} with timeout {timeout} in dir:{cwd} with' + f' Env:{env}') + process = subprocess.run(args, + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd, + env=env) + logging.info( + f'Completed command: {args}, retcode: {process.returncode}, stdout:' + f' {process.stdout}, stderr: {process.stderr}') + return process + except Exception as e: + message = traceback.format_exc() + logging.exception( + f'An unexpected exception was thrown: {e} when running {args}:' + f' {message}') + return None def _create_venv(requirements_path: Iterable[str], venv_dir: str, timeout: float) -> Tuple[str, subprocess.CompletedProcess]: """Creates a Python virtual environment. - The virtual environment is created with --system-site-packages set, - which allows it to access modules installed on the host. This provides - the opportunity to use the requirements.txt file for this project as - a central requirement file for all user scripts. - - Args: - requirements_path: List of paths to pip requirement files listing the - dependencies to install, each as a string. - venv_dir: Path to the directory to create the virtual environment in - as a string. - timeout: Maximum time the creation script can run for in seconds - as a float. - - Returns: - A tuple consisting of the path to the created interpreter as a string - and a subprocess.CompletedProcess object used to create the environment. - - Raises: - Same exceptions as subprocess.run. - """ + The virtual environment is created with --system-site-packages set, + which allows it to access modules installed on the host. This provides + the opportunity to use the requirements.txt file for this project as + a central requirement file for all user scripts. + + Args: + requirements_path: List of paths to pip requirement files listing the + dependencies to install, each as a string. + venv_dir: Path to the directory to create the virtual environment in as a + string. + timeout: Maximum time the creation script can run for in seconds as a + float. + + Returns: + A tuple consisting of the path to the created interpreter as a string + and a subprocess.CompletedProcess object used to create the environment. + + Raises: + Same exceptions as subprocess.run. + """ with tempfile.NamedTemporaryFile(mode='w', suffix='.sh') as script: script.write(f'python3 -m venv --system-site-packages {venv_dir}\n') script.write(f'. {venv_dir}/bin/activate\n') for path in requirements_path: if os.path.exists(path): - script.write('python3 -m pip install --no-cache-dir ' - f'--requirement {path}\n') + script.write( + f'python3 -m pip install --no-cache-dir --requirement {path}\n' + ) script.flush() process = _run_with_timeout(['bash', script.name], timeout) return os.path.join(venv_dir, 'bin/python3'), process -def _run_user_script(interpreter_path: str, - script_path: str, - timeout: float, - args: list = None, - cwd: str = None) -> subprocess.CompletedProcess: +def _run_user_script( + interpreter_path: str, + script_path: str, + timeout: float, + args: list = None, + cwd: str = None, + env: dict = None, +) -> subprocess.CompletedProcess: """Runs a user Python script. - Args: - script_path: Path to the user script to run as a string. - interpreter_path: Path to the Python interpreter to run the - user script as a string. - timeout: Maximum time the user script can run for in seconds - as a float. - args: A list of arguments each as a string to pass to the - user script on the command line. - cwd: Current working directory of the process as a string. - - Returns: - subprocess.CompletedProcess object used to run the script. - - Raises: - subprocess.TimeoutExpired: The user script did not finish - within timeout. - """ - if args is None: - args = [] - return _run_with_timeout([interpreter_path, script_path] + list(args), - timeout, cwd) - - -def _init_run_helper(dashboard: dashboard_api.DashboardAPI, - commit_sha: str = None, - repo_name: str = None, - branch_name: str = None, - pr_number: str = None) -> Dict: + Args: + script_path: Path to the user script to run as a string. + interpreter_path: Path to the Python interpreter to run the user script as + a string. + timeout: Maximum time the user script can run for in seconds as a float. + args: A list of arguments each as a string to pass to the user script on + the command line. + cwd: Current working directory of the process as a string. + env: Dict of environment variables for the user script run. + + Returns: + subprocess.CompletedProcess object used to run the script. + + Raises: + subprocess.TimeoutExpired: The user script did not finish + within timeout. + """ + script_args = [interpreter_path] + script_args.extend(script_path.split(' ')) + if args: + script_args.extend(args) + return _run_with_timeout_async(script_args, timeout, cwd, env) + + +def _init_run_helper( + dashboard: dashboard_api.DashboardAPI, + commit_sha: str = None, + repo_name: str = None, + branch_name: str = None, + pr_number: str = None, +) -> Dict: """Initializes a system run with the import progress dashboard.""" run = {} if commit_sha: @@ -675,34 +814,39 @@ def _init_run_helper(dashboard: dashboard_api.DashboardAPI, return dashboard.init_run(run) -def _init_attempt_helper(dashboard: dashboard_api.DashboardAPI, run_id: str, - import_name: str, absolute_import_name: str, - provenance_url: str, - provenance_description: str) -> Dict: +def _init_attempt_helper( + dashboard: dashboard_api.DashboardAPI, + run_id: str, + import_name: str, + absolute_import_name: str, + provenance_url: str, + provenance_description: str, +) -> Dict: """Initializes an import attempt with the import progress dashboard.""" return dashboard.init_attempt({ 'run_id': run_id, 'import_name': import_name, 'absolute_import_name': absolute_import_name, 'provenance_url': provenance_url, - 'provenance_description': provenance_description + 'provenance_description': provenance_description, }) def _mark_system_run_failed(run_id: str, message: str, dashboard: dashboard_api.DashboardAPI) -> Dict: """Communicates with the import progress dashboard that a system run - has failed. - Args: - run_id: ID of the system run. - message: An additional message to log to the dashboard - with level critical. - dashboard: DashboardAPI object for the communicaiton. + has failed. - Returns: - Updated system run returned from the dashboard. - """ + Args: + run_id: ID of the system run. + message: An additional message to log to the dashboard with level + critical. + dashboard: DashboardAPI object for the communicaiton. + + Returns: + Updated system run returned from the dashboard. + """ dashboard.critical(message, run_id=run_id) return dashboard.update_run( { @@ -714,17 +858,18 @@ def _mark_system_run_failed(run_id: str, message: str, def _mark_import_attempt_failed(attempt_id: str, message: str, dashboard: dashboard_api.DashboardAPI) -> Dict: """Communicates with the import progress dashboard that an import attempt - has failed. - Args: - attempt_id: ID of the import attempt. - message: An additional message to log to the dashboard - with level critical. - dashboard: DashboardAPI object for the communicaiton. + has failed. - Returns: - Updated import attempt returned from the dashboard. - """ + Args: + attempt_id: ID of the import attempt. + message: An additional message to log to the dashboard with level + critical. + dashboard: DashboardAPI object for the communicaiton. + + Returns: + Updated import attempt returned from the dashboard. + """ dashboard.critical(message, attempt_id=attempt_id) return dashboard.update_attempt( { @@ -743,13 +888,13 @@ def _clean_time( time: str, chars_to_replace: Tuple[str] = (':', '-', '.', '+')) -> str: """Replaces some characters with underscores. - Args: - time: Time string. - chars_to_replace: List of characters to replace with underscores. + Args: + time: Time string. + chars_to_replace: List of characters to replace with underscores. - Returns: - Time string with the characters replaced. - """ + Returns: + Time string with the characters replaced. + """ for char in chars_to_replace: time = time.replace(char, '_') return time @@ -759,11 +904,11 @@ def _construct_process_message(message: str, process: subprocess.CompletedProcess) -> str: """Constructs a log message describing the result of a subprocess. - Args: - message: Brief log message as a string. - process: subprocess.CompletedProcess object whose arguments, - return code, stdout, and stderr are to be added to the message. - """ + Args: + message: Brief log message as a string. + process: subprocess.CompletedProcess object whose arguments, return code, + stdout, and stderr are to be added to the message. + """ command = process.args if isinstance(command, list): command = utils.list_to_str(command, ' ') @@ -771,29 +916,31 @@ def _construct_process_message(message: str, f'[Subprocess command]: {command}\n' f'[Subprocess return code]: {process.returncode}') if process.stdout: - message += '\n[Subprocess stdout]:\n' f'{process.stdout}' + message += f'\n[Subprocess stdout]:\n{process.stdout}' if process.stderr: - message += '\n[Subprocess stderr]:\n' f'{process.stderr}' + message += f'\n[Subprocess stderr]:\n{process.stderr}' return message -def _log_process(process: subprocess.CompletedProcess, - dashboard: dashboard_api.DashboardAPI = None, - attempt_id: str = None, - run_id: str = None) -> None: +def _log_process( + process: subprocess.CompletedProcess, + dashboard: dashboard_api.DashboardAPI = None, + attempt_id: str = None, + run_id: str = None, +) -> None: """Logs the result of a subprocess. - dashboard, attempt_id, and run_id are only for logging to the import - progress dashboard. They can be None to not perform such logging. - - Args: - process: subprocess.CompletedProcess object whose arguments, - return code, stdout, and stderr are to be logged. - dashboard: DashboardAPI object to communicate with the - import progress dashboard. - attempt_id: ID of the import attempt as a string. - run_id: ID of the system run as a string. - """ + dashboard, attempt_id, and run_id are only for logging to the import + progress dashboard. They can be None to not perform such logging. + + Args: + process: subprocess.CompletedProcess object whose arguments, return code, + stdout, and stderr are to be logged. + dashboard: DashboardAPI object to communicate with the import progress + dashboard. + attempt_id: ID of the import attempt as a string. + run_id: ID of the system run as a string. + """ message = 'Subprocess succeeded' if process.returncode: message = 'Subprocess failed' diff --git a/import-automation/executor/local_executor.py b/import-automation/executor/local_executor.py index 16bec15d58..adbcc4f232 100644 --- a/import-automation/executor/local_executor.py +++ b/import-automation/executor/local_executor.py @@ -29,6 +29,8 @@ and 'data' is the repo_name. """ +import os + from absl import flags from absl import app @@ -85,7 +87,8 @@ def main(_): github_repo_name=FLAGS.repo_name, github_repo_owner_username=FLAGS.owner_username, github_auth_username=FLAGS.username, - github_auth_access_token=FLAGS.access_token) + github_auth_access_token=FLAGS.access_token, + user_script_env=os.environ) executor = import_executor.ImportExecutor( uploader=file_uploader.LocalFileUploader(output_dir=FLAGS.output_dir), github=github_api.GitHubRepoAPI(config.github_repo_owner_username,