This proof-of-concept represents a new, standalone version of a data ingestion pipeline that load data downloaded from Socrata into the 311 Data project Postgres database.
The pipeline can be used either to initially populate a new database with one or more years of data or it can be used to get only new or changed records since the last time the tool was run.
The steps in the data ingestion/update process:
- Configure and start the flow
- Download data (by year) from Socrata and save it to CSV files
- Insert the downloaded data to a temporary table in Postgres
- Move the data from the temporary to the requests table
- Update views and vacuum the database
- Write some metadata about the load (and options post to Slack)
- Clear API Redis cache when complete
- Clean out old CSV files (?)
- Create better mocks for tests
- Configure the views to refresh during load
The data comes from the Los Angeles 311 system as exposed though a Socrata instance hosted by the city.
Socrata has a python library called Sodapy which acts as a client for its API and is used in this project.
The engine managing the data ingestion process is Prefect Core. Prefect is a company built around an open source library in a so-called open-core model.
In the "Prefect" idiom, there are tasks which perform an action and are chained together into a flow. Flows can be executed in a number of different ways, but this project is set up to run the flow as a python file via the command line:
To load data into an API database using the Docker image
- Build the 'prefectpoc' docker image
- Run the docker image as follows:
# set the DSN as an environment variable
# run the image interactively
docker run --env PREFECT__CONTEXT__SECRETS__DSN -it prefectpoc
Populating a new database from Socrata requires a few additional settings. They are found in the TOML file but the easiest way to provide them is using environment variables.
# set the DSN as an environment variable
export PREFECT__MODE=full # specify that this is a full data loadd
export PREFECT__RESET_DB=true # to wipe any existing requests data
export PREFECT__DATA__YEARS=["2020","2019"] # the years to load as a string list
# run the image interactively
# or do the above with an environment file and the --env-file argument
To load data into an API database:
- Download this project and cd into the project folder
- Create a virtual environment for the project (e.g. pipenv --python 3.7)
- Do a pip install of the dependencies from the requirements.txt
- Run the flow as follows:
export PREFECT__USER_CONFIG_PATH=./config.toml
The flow supports sending a final status notification to Slack. To configure this, simply add your Slack webhook URL to the configuration secrets like you did for the DSN. (Note: this needs to be treated as a secret since Slack webhooks are otherwise not authenticated.)
Configuration is done via a TOML file called config.toml in the project root. This uses the Prefect mechanism for configuration. The Prefect default is to look for this file in the USER home directory so a special environment variable (PREFECT__USER_CONFIG_PATH) needs to be set in order to discover this in the project root.
The database URL (DSN) secrets are expected to be provided as environment variables. This needs to follow the Prefect convention and be prefixed with 'PREFECT__CONTEXT__SECRETS__' in order for it to be treated by Prefect as a Secret.
- mode: either 'full' whether all records for the matching 'year' setting or 'update' whether to download new and updated records using the 'since' setting
- domain: the path to the Socrata instance (e.g. "")
- dask: setting this to True will run the download steps in parallel
- datasets: a dictionary of available years and Socrata dataset keys
- fields: a dictionary of fields and their Postgres data type (note that this will assume varchar unless specified otherwise)
- key: the field to be used to manage inserts/updates (e.g. "srnumber")
- target: the name of the table to be ultimately loaded (e.g. "requests")
- years: the years to be loaded
- since: will only load records change since this date (note that if since is specified it will load updated data for ALL years)
select pg_size_pretty (pg_database_size('311_db'));
select pg_size_pretty (pg_relation_size('requests'));
select pg_size_pretty (pg_indexes_size('requests'));
-- get the database objects by total size
relname AS "relation",
pg_size_pretty ( pg_total_relation_size (C .oid) ) AS "total_size"
FROM pg_class C
LEFT JOIN pg_namespace N ON (N.oid = C .relnamespace)
WHERE nspname NOT IN ('pg_catalog', 'information_schema')
AND C .relkind <> 'i'
AND nspname !~ '^pg_toast'
ORDER BY pg_total_relation_size (C .oid) DESC
To clear out the Redis cache of an existing API instance:
curl -X POST "http://localhost:5000/status/reset-cache" -H "accept: application/json" -d ""
# build the image
docker build . -t prefectpoc:latest
# login to GitHub packages
cat ~/GH_TOKEN.txt | docker login -u mattyweb --password-stdin
# tag the package
docker tag 0a44395490cc
# push the package to GitHub packages
docker push