diff --git a/import-automation/executor/app/configs.py b/import-automation/executor/app/configs.py index 0df79f0c66..d829502d56 100644 --- a/import-automation/executor/app/configs.py +++ b/import-automation/executor/app/configs.py @@ -70,6 +70,9 @@ class ExecutorConfig: # The content of latest_version.txt would be a single line of # '2020_07_15T12_07_17_365264_07_00'. storage_version_filename: str = 'latest_version.txt' + # Name of the file that contains the import_metadata_mcf for the import. + # These files are stored at the same level as the storage_version_filename. + import_metadata_mcf_filename: str = 'import_metadata_mcf.mcf' # Types of inputs accepted by the Data Commons importer. These are # also the accepted fields of an import_inputs value in the manifest. import_input_types: List[str] = ('template_mcf', 'cleaned_csv', 'node_mcf') diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index bf49bfeaa4..0b7b9bf011 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -43,6 +43,12 @@ 'Please find logs in the Logs Explorer of the GCP project associated with' ' Import Automation.') +_IMPORT_METADATA_MCF_TEMPLATE = """ +Node: dcid:dc/base/{import_name} +typeOf: dcid:Provenance +lastDataRefeshDate: "{last_data_refresh_date}" +""" + @dataclasses.dataclass class ExecutionResult: @@ -379,7 +385,7 @@ def _import_one_helper( import_dir=absolute_import_dir, output_dir=f'{relative_import_dir}/{import_spec["import_name"]}', version=version, - import_inputs=import_spec.get('import_inputs', []), + import_spec=import_spec, ) if self.importer: @@ -410,7 +416,7 @@ def _upload_import_inputs( import_dir: str, output_dir: str, version: str, - import_inputs: List[Dict[str, str]], + import_spec: dict, ) -> import_service.ImportInputs: """Uploads the generated import data files. @@ -422,14 +428,13 @@ def _upload_import_inputs( import_dir: Absolute path to the directory with the manifest, as a string. output_dir: Path to the output directory, as a string. - import_inputs: List of import inputs each as a dict mapping import types - to relative paths within the repository. This is parsed from the - 'import_inputs' field in the manifest. + import_inputs: Specification of the import as a dict. Returns: ImportInputs object containing the paths to the uploaded inputs. """ uploaded = import_service.ImportInputs() + import_inputs = import_spec.get('import_inputs', []) for import_input in import_inputs: for input_type in self.config.import_input_types: path = import_input.get(input_type) @@ -443,6 +448,9 @@ def _upload_import_inputs( self.uploader.upload_string( version, os.path.join(output_dir, self.config.storage_version_filename)) + self.uploader.upload_string( + self._import_metadata_mcf_helper(import_spec), + os.path.join(output_dir, self.config.import_metadata_mcf_filename)) return uploaded def _upload_file_helper(self, src: str, dest: str) -> None: @@ -454,6 +462,25 @@ def _upload_file_helper(self, src: str, dest: str) -> None: """ self.uploader.upload_file(src, dest) + def _import_metadata_mcf_helper(self, import_spec: dict) -> str: + """Generates import_metadata_mcf node for import. + + Args: + import_spec: Specification of the import as a dict. + + Returns: + import_metadata_mcf node. + """ + node = _IMPORT_METADATA_MCF_TEMPLATE.format_map({ + "import_name": import_spec.get('import_name'), + "last_data_refresh_date": _clean_date(utils.pacific_time()) + }) + next_data_refresh_date = utils.next_pacific_date( + import_spec.get('cron_schedule')) + if next_data_refresh_date: + node += f'nextDataRefreshDate: "{next_data_refresh_date}"\n' + return node + def parse_manifest(path: str) -> dict: """Parses the import manifest. @@ -600,7 +627,7 @@ def _run_with_timeout(args: List[str], logging.exception( f'An unexpected exception was thrown: {e} when running {args}:' f' {message}') - return None + raise e def _create_venv(requirements_path: Iterable[str], venv_dir: str, @@ -691,6 +718,18 @@ def _clean_time( return time +def _clean_date(time: str) -> str: + """Converts ISO8601 time string to YYYY-MM-DD format. + + Args: + time: Time string in ISO8601 format. + + Returns: + YYYY-MM-DD date. + """ + return time[:10] + + def _construct_process_message(message: str, process: subprocess.CompletedProcess) -> str: """Constructs a log message describing the result of a subprocess. diff --git a/import-automation/executor/app/utils.py b/import-automation/executor/app/utils.py index 12fc196c8b..6d320c62cb 100644 --- a/import-automation/executor/app/utils.py +++ b/import-automation/executor/app/utils.py @@ -19,11 +19,14 @@ import os import re import datetime +from croniter import croniter from typing import List import pytz import requests +_PACIFIC_TIME = 'America/Los_Angeles' + def utctime(): """Returns the current time string in ISO 8601 with timezone UTC+0, e.g. @@ -34,8 +37,28 @@ def utctime(): def pacific_time(): """Returns the current time string in ISO 8601 with timezone America/Los_Angeles, e.g. '2020-06-30T04:28:53.717569-07:00'.""" - return datetime.datetime.now( - pytz.timezone('America/Los_Angeles')).isoformat() + return datetime.datetime.now(pytz.timezone(_PACIFIC_TIME)).isoformat() + + +def next_pacific_date(cron_expression: str, from_time: str = None) -> str: + """Returns the next date from today in ISO8601 with timezone + America/Los_Angeles, given a cron schedule. + + Args: + cron_expression: Expression for cron schedule. + from_time: Optional time to start from. Default is now. + + Returns: + The next date based on the schedule. + """ + try: + if not from_time: + from_time = datetime.datetime.now(pytz.timezone(_PACIFIC_TIME)) + iter = croniter(cron_expression, from_time) + return iter.get_next(datetime.datetime).date().isoformat() + except Exception as e: + print(f"Error calculating next date: {e}") + return "" def list_to_str(a_list: List, sep: str = ', ') -> str: diff --git a/import-automation/executor/requirements.txt b/import-automation/executor/requirements.txt index aaf0962d05..956b49e547 100644 --- a/import-automation/executor/requirements.txt +++ b/import-automation/executor/requirements.txt @@ -11,3 +11,4 @@ flask gunicorn pytz absl-py +croniter diff --git a/import-automation/executor/test/file_uploader_test.py b/import-automation/executor/test/file_uploader_test.py index 9be6b84841..7e2092fe8d 100644 --- a/import-automation/executor/test/file_uploader_test.py +++ b/import-automation/executor/test/file_uploader_test.py @@ -60,9 +60,8 @@ class LocalFileUploaderTest(unittest.TestCase): def test_upload_file(self): with tempfile.TemporaryDirectory() as tmp_dir: uploader = file_uploader.LocalFileUploader(tmp_dir) - src = os.path.join( - os.getcwd(), - 'import-automation/executor/test/data/COVIDTracking_States.csv') + src = os.path.join(os.path.dirname(__file__), + 'data/COVIDTracking_States.csv') uploader.upload_file(src, 'foo/bar/data.csv') self.assertTrue( utils.compare_lines(src, diff --git a/import-automation/executor/test/github_api_test.py b/import-automation/executor/test/github_api_test.py index d3b2a5c61a..394d8bd21f 100644 --- a/import-automation/executor/test/github_api_test.py +++ b/import-automation/executor/test/github_api_test.py @@ -294,7 +294,8 @@ def test_find_dirs_in_commit_empty(self, dir_exists, query_files_in_dir, @mock.patch('requests.get') def test_download_repo(self, get): - tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz' + tar_path = os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.tar.gz') with open(tar_path, 'rb') as tar: headers = {'Content-Disposition': 'attachment; filename=abc'} get.return_value = utils.ResponseMock(200, raw=tar, headers=headers) @@ -307,24 +308,28 @@ def test_download_repo(self, get): file = os.path.join(downloaded, 'treasury_constant_maturity_rates.csv') assert test.utils.compare_lines( - 'import-automation/executor/test/data/treasury_constant_maturity_rates.csv', + os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.csv'), file, integration_test.NUM_LINES_TO_CHECK) file = os.path.join(downloaded, 'treasury_constant_maturity_rates.mcf') assert test.utils.compare_lines( - 'import-automation/executor/test/data/treasury_constant_maturity_rates.mcf', + os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.mcf'), file, integration_test.NUM_LINES_TO_CHECK) file = os.path.join(downloaded, 'treasury_constant_maturity_rates.tmcf') assert test.utils.compare_lines( - 'import-automation/executor/test/data/treasury_constant_maturity_rates.tmcf', + os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.tmcf'), file, integration_test.NUM_LINES_TO_CHECK) @mock.patch('requests.get') def test_download_repo_timeout(self, get): - tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz' + tar_path = os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.tar.gz') with open(tar_path, 'rb') as tar: headers = {'Content-Disposition': 'attachment; filename=abc'} get.return_value = utils.ResponseMock(200, raw=tar, headers=headers) @@ -335,7 +340,7 @@ def test_download_repo_timeout(self, get): @mock.patch('requests.get') def test_download_repo_empty(self, get): - with open('import-automation/executor/test/data/empty.tar.gz', + with open(os.path.join(os.path.dirname(__file__), 'data/empty.tar.gz'), 'rb') as tar: headers = {'Content-Disposition': 'attachment; filename=abc'} get.return_value = utils.ResponseMock(200, raw=tar, headers=headers) @@ -346,7 +351,8 @@ def test_download_repo_empty(self, get): @mock.patch('requests.get') def test_download_repo_http_error(self, get): - tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz' + tar_path = os.path.join(os.path.dirname(__file__), + 'data/treasury_constant_maturity_rates.tar.gz') with open(tar_path, 'rb') as tar: get.return_value = utils.ResponseMock(400, raw=tar) diff --git a/import-automation/executor/test/import_executor_test.py b/import-automation/executor/test/import_executor_test.py index 0ceab3fd8c..e49b381ce9 100644 --- a/import-automation/executor/test/import_executor_test.py +++ b/import-automation/executor/test/import_executor_test.py @@ -33,6 +33,11 @@ def test_clean_time(self): '2020_07_15T12_07_17_365264_07_00', import_executor._clean_time('2020-07-15T12:07:17.365264-07:00')) + def test_clean_date(self): + self.assertEqual( + '2020-07-15', + import_executor._clean_date('2020-07-15T12:07:17.365264+00:00')) + def test_run_with_timeout(self): self.assertRaises(subprocess.TimeoutExpired, import_executor._run_with_timeout, ['sleep', '5'], @@ -62,8 +67,7 @@ def test_run_and_handle_exception(self): def raise_exception(): raise Exception - result = import_executor.run_and_handle_exception( - 'run', raise_exception) + result = import_executor.run_and_handle_exception(raise_exception) self.assertEqual('failed', result.status) self.assertEqual([], result.imports_executed) self.assertIn('Exception', result.message) diff --git a/import-automation/executor/test/utils_test.py b/import-automation/executor/test/utils_test.py index e842e4ee03..cc29475398 100644 --- a/import-automation/executor/test/utils_test.py +++ b/import-automation/executor/test/utils_test.py @@ -47,6 +47,16 @@ def test_pacific_time_to_datetime_then_back(self): time_datetime = datetime.datetime.fromisoformat(time_iso) self.assertEqual(time_iso, time_datetime.isoformat()) + def test_next_pacific_date(self): + """Tests next_pacific_date.""" + # At 00:00 on Friday. + cron_expression = '0 0 * * FRI' + # Friday. + from_time = datetime.datetime(2024, 12, 13) + self.assertEqual( + app.utils.next_pacific_date(cron_expression, from_time), + '2024-12-20') + def test_download_file(self): """Response does not have a Content-Disposition header.""" url = ('https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/'