Skip to content

Commit

Permalink
Invoke validations from import executor
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs committed Dec 16, 2024
1 parent d8e35d9 commit 70e507b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 36 deletions.
6 changes: 6 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ class ExecutorConfig:
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Skip uploading the data to GCS (for local testing).
skip_gcs_upload: bool = True
# Invoke validations before upload.
invoke_import_validation: bool = True
# Import validation config file.
validation_config_file: str = 'tools/validation/config.json'
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
Expand Down
159 changes: 123 additions & 36 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from app.service import file_uploader
from app.service import github_api
from app.service import import_service
from google.cloud import storage

# Email address for status messages.
_DEBUG_EMAIL_ADDR = '[email protected]'
Expand Down Expand Up @@ -283,6 +284,8 @@ def _import_one(
import_spec: Specification of the import as a dict.
"""
import_name = import_spec['import_name']
curator_emails = import_spec['curator_emails']
dc_email_aliases = [_ALERT_EMAIL_ADDR, _DEBUG_EMAIL_ADDR]
absolute_import_name = import_target.get_absolute_import_name(
relative_import_dir, import_name)
time_start = time.time()
Expand All @@ -294,14 +297,8 @@ def _import_one(
import_spec=import_spec,
)
time_taken = '{0:.2f}'.format(time.time() - time_start)
if self.notifier:
msg = f'Successful Import: {import_name} ({absolute_import_name})\nn'
msg += f'Script execution time taken = {time_taken}s'
self.notifier.send(
subject=f'Import Automation Success - {import_name}',
body=msg,
receiver_addresses=[_DEBUG_EMAIL_ADDR],
)
logging.info(f'Import Automation Success - {import_name}')
logging.info(f'Script execution time taken = {time_taken}s')

except Exception as exc:
if self.notifier:
Expand All @@ -312,10 +309,105 @@ def _import_one(
self.notifier.send(
subject=f'Import Automation Failure - {import_name}',
body=msg,
receiver_addresses=[_ALERT_EMAIL_ADDR, _DEBUG_EMAIL_ADDR],
# receiver_addresses=[_ALERT_EMAIL_ADDR, _DEBUG_EMAIL_ADDR],
receiver_addresses=['[email protected]'],
)
raise exc

def _invoke_import_validation(
self,
repo_dir: str,
relative_import_dir: str,
absolute_import_dir: str,
import_spec: dict,
interpreter_path: str,
process: subprocess.CompletedProcess
) -> None:
"""
Performs validations on import data.
"""
import_inputs = import_spec.get('import_inputs', [])
for import_input in import_inputs:
# inputs = json.loads(import_input)
path = import_input['node_mcf']
current_data_path = os.path.join(absolute_import_dir, path)
previous_data_path = os.path.join(absolute_import_dir, path) + '.old'
differ_results_path = os.path.join(absolute_import_dir, 'results')
config_file_path = os.path.join(absolute_import_dir, self.config.validation_config_file)

# Download previous import data.
bucket = storage.Client(self.config.gcs_project_id).bucket(self.config.storage_prod_bucket_name)
folder = relative_import_dir + '/' + import_spec['import_name'] + '/'
blob = bucket.blob(folder + 'latest_version.txt')
blob = bucket.blob(folder + blob.download_as_text() + '/' + path)
blob.download_to_filename(previous_data_path)

# Invoke data differ script.
differ_script_path = os.path.join(repo_dir, 'tools', 'differ', 'differ.py')
differ_script_args: List[str] = ('--current_data=' + current_data_path,
'--previous_data=' + previous_data_path,
'--output_location=' + differ_results_path)
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=differ_script_path,
timeout=self.config.user_script_timeout,
args=differ_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

# Invoke data validation script.
validation_script_path = os.path.join(repo_dir, 'tools', 'validation', 'validation.py')
validation_script_args: List[str] = ('--differ_output_location=' + differ_results_path,
'--config_file=' + config_file_path)
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=validation_script_path,
timeout=self.config.user_script_timeout,
args=validation_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _invoke_import_job(
self,
absolute_import_dir: str,
import_spec: dict,
version: str,
interpreter_path: str,
process: subprocess.CompletedProcess
) -> None:
script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()

def _import_one_helper(
self,
repo_dir: str,
Expand Down Expand Up @@ -348,33 +440,26 @@ def _import_one_helper(
_log_process(process=process)
process.check_returncode()

script_paths = import_spec.get('scripts')
for path in script_paths:
script_path = os.path.join(absolute_import_dir, path)
simple_job = cloud_run_simple_import.get_simple_import_job_id(
import_spec, script_path)
if simple_job:
# Running simple import as cloud run job.
cloud_run_simple_import.cloud_run_simple_import_job(
import_spec=import_spec,
config_file=script_path,
env=self.config.user_script_env,
version=version,
image=import_spec.get('image'),
)
else:
# Run import script locally.
process = _run_user_script(
interpreter_path=interpreter_path,
script_path=script_path,
timeout=self.config.user_script_timeout,
args=self.config.user_script_args,
cwd=absolute_import_dir,
env=self.config.user_script_env,
)
_log_process(process=process)
process.check_returncode()
self._invoke_import_job(
absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
version=version,
interpreter_path=interpreter_path,
process=process
)

if self.config.invoke_import_validation:
self._invoke_import_validation(
repo_dir=repo_dir,
relative_import_dir=relative_import_dir,
absolute_import_dir=absolute_import_dir,
import_spec=import_spec,
interpreter_path=interpreter_path,
process=process
)

if self.config.skip_gcs_upload:
return
inputs = self._upload_import_inputs(
import_dir=absolute_import_dir,
output_dir=f'{relative_import_dir}/{import_spec["import_name"]}',
Expand All @@ -384,7 +469,7 @@ def _import_one_helper(

if self.importer:
self.importer.delete_previous_output(relative_import_dir,
import_spec)
import_spec)
try:
self.importer.delete_import(
relative_import_dir,
Expand Down Expand Up @@ -455,6 +540,7 @@ def _upload_file_helper(self, src: str, dest: str) -> None:
self.uploader.upload_file(src, dest)



def parse_manifest(path: str) -> dict:
"""Parses the import manifest.
Expand Down Expand Up @@ -539,6 +625,7 @@ def _run_with_timeout_async(args: List[str],

end_time = time.time()

process.wait()
return_code = process.returncode
end_msg = (
f'Completed script: "{args}", Return code: {return_code}, time:'
Expand Down
3 changes: 3 additions & 0 deletions import-automation/executor/schedule_update_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ def main(_):
logging.info("*************************************************")
res = dataclasses.asdict(
update(cfg, absolute_import_path, local_repo_dir=repo_dir))
logging.info('Update completed with status: %s', res['status'])
if(res['status'] != 'succeeded'):
return
logging.info("*************************************************")
logging.info("*********** Update Complete. ********************")
logging.info("*************************************************")
Expand Down

0 comments on commit 70e507b

Please sign in to comment.