Skip to content

Commit

Permalink
Merge pull request #897 from hackforla/feat-2021-dataset-addition
Browse files Browse the repository at this point in the history
adding new year datasource
  • Loading branch information
mattyweb authored Jan 20, 2021
2 parents 7055df4 + e9a64fc commit 34b0f31
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
24 changes: 24 additions & 0 deletions server/prefect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ The database URL (DSN) secrets are expected to be provided as environment variab
* years: the years to be loaded
* since: will only load records change since this date (note that if since is specified it will load updated data for ALL years)

## Troubleshooting

### Prefect Output Logs

The first thing to look at when running the prefect tasks is you should see an output like the following in your shell.

```bash
$ docker-compose run prefect python flow.py

Creating 311_data_prefect_run ... done
[2021-01-20 22:02:11] INFO - prefect | Starting update flow for 2019, 2020, 2021
[2021-01-20 22:02:11] INFO - prefect.FlowRunner | Beginning Flow run for 'Loading Socrata data to Postgres'
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'get_start_datetime': Starting task run...
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'datasets': Starting task run...
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'datasets': finished task run for task with final state: 'Success'
[2021-01-20 22:02:27] INFO - prefect.get_start_datetime | 2021-01-20 10:59:40
[2021-01-20 22:02:28] INFO - prefect.TaskRunner | Task 'get_start_datetime': finished task run for task with final state: 'Success'

```

In the example above, the ```prefect | Starting update flow``` will show you the years the flow is trying to load and the ```prefect.get_start_datetime``` will show you the date the flow uses as the last modified date for new data to be loaded.

Other log lines will show the number of records inserted versus updated as well as any errors in the process.

## Helpful Postgres commands to watch your database

```sql
Expand Down
4 changes: 3 additions & 1 deletion server/prefect/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ token = "6b5lwk1jHSQTgx7PAVFKOOdt2"
2018 = "h65r-yf5i"
2019 = "pvft-t768"
2020 = "rq3b-xjk8"
2021 = "97z7-y5bt"

[data]
# years to load
Expand All @@ -35,7 +36,8 @@ years = [
2017,
2018,
2019,
2020
2020,
2021
]
# name of table to load
target = "requests"
Expand Down
2 changes: 1 addition & 1 deletion server/prefect/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
datasets = Parameter("datasets")

# get last updated from database
since = postgres.get_last_updated()
since = postgres.get_start_datetime()
# download dataset from Socrata
downloads = socrata.download_dataset.map(
dataset=datasets,
Expand Down
5 changes: 4 additions & 1 deletion server/prefect/tasks/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def infer_types(fields: Dict[str, str]) -> Dict[str, str]:


@task
def get_last_updated() -> datetime:
def get_start_datetime() -> datetime:
if hasattr(prefect.config.data, "start_datetime"):
return datetime.fromisoformat(prefect.config.data.start_datetime)

logger = prefect.context.get("logger")
target = prefect.config.data.target
recent_column = prefect.config.data.recent_column
Expand Down

0 comments on commit 34b0f31

Please sign in to comment.