-
Notifications
You must be signed in to change notification settings - Fork 2
/
flow.py
75 lines (60 loc) · 2.19 KB
/
flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import prefect
from prefect import Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
from prefect.utilities.edges import unmapped
from tasks import postgres, socrata
"""
Flow: Loading Socrata Data to Postgres
--------------------------------------
This flow downloads data from the Socrata Open Data site and loads it
into a Postgres database.
Behavior is configured in the config.toml
"""
with Flow(
'Loading Socrata data to Postgres',
result=LocalResult(
dir=os.path.join(os.path.dirname(__file__), "results"),
validate_dir=True
),
state_handlers=[postgres.log_to_database]
) as flow:
datasets = Parameter("datasets")
# get last updated from database
since = postgres.get_last_updated()
# download dataset from Socrata
downloads = socrata.download_dataset.map(
dataset=datasets,
since=unmapped(since)
)
# get the temp tables ready for load
prep = postgres.prep_load()
# load each downloaded file
load = postgres.load_datafile.map(
datafile=downloads
)
# commit new data to database and clean up
complete = postgres.complete_load()
# make sure prep runs before load
flow.add_edge(upstream_task=prep, downstream_task=load)
# make sure load runs before complete
flow.add_edge(upstream_task=load, downstream_task=complete)
if __name__ == "__main__":
logger = prefect.context.get("logger")
dask = prefect.config.dask
mode = prefect.config.mode
reset_db = prefect.config.reset_db
all_datasets = dict(prefect.config.socrata.datasets)
years = list(prefect.config.data.years)
# use only year datasets if in full mode otherwise use all w/since
if mode == 'full':
run_datasets = dict((k, all_datasets[k]) for k in years)
else:
run_datasets = all_datasets
logger.info(f"Starting \"{mode}\" flow for {', '.join(run_datasets.keys())}"
f" {'and resetting db' if reset_db else ''}")
state = flow.run(
datasets=list(run_datasets.values()),
executor=LocalDaskExecutor() if dask else LocalExecutor()
)