Skip to content

Commit

Permalink
adding new year datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyweb committed Jan 20, 2021
1 parent ad02671 commit e9a64fc
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
24 changes: 24 additions & 0 deletions server/prefect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ The database URL (DSN) secrets are expected to be provided as environment variab
* years: the years to be loaded
* since: will only load records change since this date (note that if since is specified it will load updated data for ALL years)

## Troubleshooting

### Prefect Output Logs

The first thing to look at when running the prefect tasks is you should see an output like the following in your shell.

```bash
$ docker-compose run prefect python flow.py

Creating 311_data_prefect_run ... done
[2021-01-20 22:02:11] INFO - prefect | Starting update flow for 2019, 2020, 2021
[2021-01-20 22:02:11] INFO - prefect.FlowRunner | Beginning Flow run for 'Loading Socrata data to Postgres'
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'get_start_datetime': Starting task run...
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'datasets': Starting task run...
[2021-01-20 22:02:11] INFO - prefect.TaskRunner | Task 'datasets': finished task run for task with final state: 'Success'
[2021-01-20 22:02:27] INFO - prefect.get_start_datetime | 2021-01-20 10:59:40
[2021-01-20 22:02:28] INFO - prefect.TaskRunner | Task 'get_start_datetime': finished task run for task with final state: 'Success'

```

In the example above, the ```prefect | Starting update flow``` will show you the years the flow is trying to load and the ```prefect.get_start_datetime``` will show you the date the flow uses as the last modified date for new data to be loaded.

Other log lines will show the number of records inserted versus updated as well as any errors in the process.

## Helpful Postgres commands to watch your database

```sql
Expand Down
4 changes: 3 additions & 1 deletion server/prefect/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ token = "6b5lwk1jHSQTgx7PAVFKOOdt2"
2018 = "h65r-yf5i"
2019 = "pvft-t768"
2020 = "rq3b-xjk8"
2021 = "97z7-y5bt"

[data]
# years to load
Expand All @@ -35,7 +36,8 @@ years = [
2017,
2018,
2019,
2020
2020,
2021
]
# name of table to load
target = "requests"
Expand Down
2 changes: 1 addition & 1 deletion server/prefect/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
datasets = Parameter("datasets")

# get last updated from database
since = postgres.get_last_updated()
since = postgres.get_start_datetime()
# download dataset from Socrata
downloads = socrata.download_dataset.map(
dataset=datasets,
Expand Down
5 changes: 4 additions & 1 deletion server/prefect/tasks/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def infer_types(fields: Dict[str, str]) -> Dict[str, str]:


@task
def get_last_updated() -> datetime:
def get_start_datetime() -> datetime:
if hasattr(prefect.config.data, "start_datetime"):
return datetime.fromisoformat(prefect.config.data.start_datetime)

logger = prefect.context.get("logger")
target = prefect.config.data.target
recent_column = prefect.config.data.recent_column
Expand Down

0 comments on commit e9a64fc

Please sign in to comment.