Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce import_metadata_mcf in executor #1144

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class ExecutorConfig:
# The content of latest_version.txt would be a single line of
# '2020_07_15T12_07_17_365264_07_00'.
storage_version_filename: str = 'latest_version.txt'
# Name of the file that contains the import_metadata_mcf for the import.
# These files are stored at the same level as the storage_version_filename.
import_metadata_mcf_filename: str = 'import_metadata_mcf.mcf'
# Types of inputs accepted by the Data Commons importer. These are
# also the accepted fields of an import_inputs value in the manifest.
import_input_types: List[str] = ('template_mcf', 'cleaned_csv', 'node_mcf')
Expand Down
51 changes: 45 additions & 6 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
'Please find logs in the Logs Explorer of the GCP project associated with'
' Import Automation.')

_IMPORT_METADATA_MCF_TEMPLATE = """
Node: dcid:dc/base/{import_name}
typeOf: dcid:Provenance
lastDataRefeshDate: "{last_data_refresh_date}"
"""


@dataclasses.dataclass
class ExecutionResult:
Expand Down Expand Up @@ -379,7 +385,7 @@ def _import_one_helper(
import_dir=absolute_import_dir,
output_dir=f'{relative_import_dir}/{import_spec["import_name"]}',
version=version,
import_inputs=import_spec.get('import_inputs', []),
import_spec=import_spec,
)

if self.importer:
Expand Down Expand Up @@ -410,7 +416,7 @@ def _upload_import_inputs(
import_dir: str,
output_dir: str,
version: str,
import_inputs: List[Dict[str, str]],
import_spec: dict,
) -> import_service.ImportInputs:
"""Uploads the generated import data files.

Expand All @@ -422,14 +428,13 @@ def _upload_import_inputs(
import_dir: Absolute path to the directory with the manifest, as a
string.
output_dir: Path to the output directory, as a string.
import_inputs: List of import inputs each as a dict mapping import types
to relative paths within the repository. This is parsed from the
'import_inputs' field in the manifest.
import_inputs: Specification of the import as a dict.

Returns:
ImportInputs object containing the paths to the uploaded inputs.
"""
uploaded = import_service.ImportInputs()
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
for input_type in self.config.import_input_types:
path = import_input.get(input_type)
Expand All @@ -443,6 +448,9 @@ def _upload_import_inputs(
self.uploader.upload_string(
version,
os.path.join(output_dir, self.config.storage_version_filename))
self.uploader.upload_string(
self._import_metadata_mcf_helper(import_spec),
os.path.join(output_dir, self.config.import_metadata_mcf_filename))
return uploaded

def _upload_file_helper(self, src: str, dest: str) -> None:
Expand All @@ -454,6 +462,25 @@ def _upload_file_helper(self, src: str, dest: str) -> None:
"""
self.uploader.upload_file(src, dest)

def _import_metadata_mcf_helper(self, import_spec: dict) -> str:
"""Generates import_metadata_mcf node for import.

Args:
import_spec: Specification of the import as a dict.

Returns:
import_metadata_mcf node.
"""
node = _IMPORT_METADATA_MCF_TEMPLATE.format_map({
"import_name": import_spec.get('import_name'),
"last_data_refresh_date": _clean_date(utils.pacific_time())
})
next_data_refresh_date = utils.next_pacific_date(
import_spec.get('cron_schedule'))
if next_data_refresh_date:
node += f'nextDataRefreshDate: "{next_data_refresh_date}"\n'
return node


def parse_manifest(path: str) -> dict:
"""Parses the import manifest.
Expand Down Expand Up @@ -600,7 +627,7 @@ def _run_with_timeout(args: List[str],
logging.exception(
f'An unexpected exception was thrown: {e} when running {args}:'
f' {message}')
return None
raise e


def _create_venv(requirements_path: Iterable[str], venv_dir: str,
Expand Down Expand Up @@ -691,6 +718,18 @@ def _clean_time(
return time


def _clean_date(time: str) -> str:
"""Converts ISO8601 time string to YYYY-MM-DD format.

Args:
time: Time string in ISO8601 format.

Returns:
YYYY-MM-DD date.
"""
return time[:10]


def _construct_process_message(message: str,
process: subprocess.CompletedProcess) -> str:
"""Constructs a log message describing the result of a subprocess.
Expand Down
27 changes: 25 additions & 2 deletions import-automation/executor/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import os
import re
import datetime
from croniter import croniter
from typing import List

import pytz
import requests

_PACIFIC_TIME = 'America/Los_Angeles'


def utctime():
"""Returns the current time string in ISO 8601 with timezone UTC+0, e.g.
Expand All @@ -34,8 +37,28 @@ def utctime():
def pacific_time():
"""Returns the current time string in ISO 8601 with timezone
America/Los_Angeles, e.g. '2020-06-30T04:28:53.717569-07:00'."""
return datetime.datetime.now(
pytz.timezone('America/Los_Angeles')).isoformat()
return datetime.datetime.now(pytz.timezone(_PACIFIC_TIME)).isoformat()


def next_pacific_date(cron_expression: str, from_time: str = None) -> str:
"""Returns the next date from today in ISO8601 with timezone
America/Los_Angeles, given a cron schedule.

Args:
cron_expression: Expression for cron schedule.
from_time: Optional time to start from. Default is now.

Returns:
The next date based on the schedule.
"""
try:
if not from_time:
from_time = datetime.datetime.now(pytz.timezone(_PACIFIC_TIME))
iter = croniter(cron_expression, from_time)
return iter.get_next(datetime.datetime).date().isoformat()
except Exception as e:
print(f"Error calculating next date: {e}")
return ""


def list_to_str(a_list: List, sep: str = ', ') -> str:
Expand Down
1 change: 1 addition & 0 deletions import-automation/executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ flask
gunicorn
pytz
absl-py
croniter
5 changes: 2 additions & 3 deletions import-automation/executor/test/file_uploader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ class LocalFileUploaderTest(unittest.TestCase):
def test_upload_file(self):
with tempfile.TemporaryDirectory() as tmp_dir:
uploader = file_uploader.LocalFileUploader(tmp_dir)
src = os.path.join(
os.getcwd(),
'import-automation/executor/test/data/COVIDTracking_States.csv')
src = os.path.join(os.path.dirname(__file__),
'data/COVIDTracking_States.csv')
uploader.upload_file(src, 'foo/bar/data.csv')
self.assertTrue(
utils.compare_lines(src,
Expand Down
20 changes: 13 additions & 7 deletions import-automation/executor/test/github_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def test_find_dirs_in_commit_empty(self, dir_exists, query_files_in_dir,

@mock.patch('requests.get')
def test_download_repo(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -307,24 +308,28 @@ def test_download_repo(self, get):
file = os.path.join(downloaded,
'treasury_constant_maturity_rates.csv')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.csv',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.csv'),
file, integration_test.NUM_LINES_TO_CHECK)

file = os.path.join(downloaded,
'treasury_constant_maturity_rates.mcf')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.mcf',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.mcf'),
file, integration_test.NUM_LINES_TO_CHECK)

file = os.path.join(downloaded,
'treasury_constant_maturity_rates.tmcf')
assert test.utils.compare_lines(
'import-automation/executor/test/data/treasury_constant_maturity_rates.tmcf',
os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tmcf'),
file, integration_test.NUM_LINES_TO_CHECK)

@mock.patch('requests.get')
def test_download_repo_timeout(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -335,7 +340,7 @@ def test_download_repo_timeout(self, get):

@mock.patch('requests.get')
def test_download_repo_empty(self, get):
with open('import-automation/executor/test/data/empty.tar.gz',
with open(os.path.join(os.path.dirname(__file__), 'data/empty.tar.gz'),
'rb') as tar:
headers = {'Content-Disposition': 'attachment; filename=abc'}
get.return_value = utils.ResponseMock(200, raw=tar, headers=headers)
Expand All @@ -346,7 +351,8 @@ def test_download_repo_empty(self, get):

@mock.patch('requests.get')
def test_download_repo_http_error(self, get):
tar_path = 'import-automation/executor/test/data/treasury_constant_maturity_rates.tar.gz'
tar_path = os.path.join(os.path.dirname(__file__),
'data/treasury_constant_maturity_rates.tar.gz')
with open(tar_path, 'rb') as tar:
get.return_value = utils.ResponseMock(400, raw=tar)

Expand Down
8 changes: 6 additions & 2 deletions import-automation/executor/test/import_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def test_clean_time(self):
'2020_07_15T12_07_17_365264_07_00',
import_executor._clean_time('2020-07-15T12:07:17.365264-07:00'))

def test_clean_date(self):
self.assertEqual(
'2020-07-15',
import_executor._clean_date('2020-07-15T12:07:17.365264+00:00'))

def test_run_with_timeout(self):
self.assertRaises(subprocess.TimeoutExpired,
import_executor._run_with_timeout, ['sleep', '5'],
Expand Down Expand Up @@ -62,8 +67,7 @@ def test_run_and_handle_exception(self):
def raise_exception():
raise Exception

result = import_executor.run_and_handle_exception(
'run', raise_exception)
result = import_executor.run_and_handle_exception(raise_exception)
self.assertEqual('failed', result.status)
self.assertEqual([], result.imports_executed)
self.assertIn('Exception', result.message)
Expand Down
10 changes: 10 additions & 0 deletions import-automation/executor/test/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def test_pacific_time_to_datetime_then_back(self):
time_datetime = datetime.datetime.fromisoformat(time_iso)
self.assertEqual(time_iso, time_datetime.isoformat())

def test_next_pacific_date(self):
"""Tests next_pacific_date."""
# At 00:00 on Friday.
cron_expression = '0 0 * * FRI'
# Friday.
from_time = datetime.datetime(2024, 12, 13)
self.assertEqual(
app.utils.next_pacific_date(cron_expression, from_time),
'2024-12-20')

def test_download_file(self):
"""Response does not have a Content-Disposition header."""
url = ('https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/'
Expand Down
Loading