diff --git a/server/prefect/flow.py b/server/prefect/flow.py index b03242adf..2f00cdcf2 100644 --- a/server/prefect/flow.py +++ b/server/prefect/flow.py @@ -1,3 +1,5 @@ +import sys + import prefect from prefect import Flow, Parameter from prefect.engine.executors import LocalDaskExecutor, LocalExecutor @@ -54,17 +56,16 @@ all_datasets = dict(prefect.config.socrata.datasets) years = list(prefect.config.data.years) - # use only year datasets if in full mode otherwise use all w/since + # use only datasets for configured years run_datasets = dict((k, all_datasets[str(k)]) for k in years) - # if mode == 'full': - # run_datasets = dict((k, all_datasets[str(k)]) for k in years) - # else: - # run_datasets = all_datasets - logger.info(f"Starting \"{mode}\" flow for {', '.join(map(str, run_datasets.keys()))}" f" {'and resetting db' if reset_db else ''}") + state = flow.run( datasets=list(run_datasets.values()), executor=LocalDaskExecutor() if dask else LocalExecutor() ) + + if state.is_finished() and state.is_failed(): + sys.exit(1) # exit with error if flow state is failed