Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL Ingestion functionality updates #177

Merged
merged 3 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 114 additions & 25 deletions server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
from sqlalchemy.types import Integer, Text, String, DateTime, Float
from sqlalchemy import create_engine
import sqlalchemy as db
import pandas as pd
from configparser import ConfigParser # For configparser compatible formatting see: https://docs.python.org/3/library/configparser.html
import numpy as np
import logging
import io
from sodapy import Socrata
import time

class DataHandler:
def __init__(self, config=None, configFilePath=None, separator=','):
Expand All @@ -15,6 +18,11 @@ def __init__(self, config=None, configFilePath=None, separator=','):
self.filePath = None
self.configFilePath = configFilePath
self.separator = separator
self.fields = ['srnumber', 'createddate', 'updateddate', 'actiontaken', 'owner', 'requesttype', 'status', 'requestsource',
'createdbyuserorganization', 'mobileos', 'anonymous', 'assignto', 'servicedate', 'closeddate',
'addressverified', 'approximateaddress', 'address', 'housenumber', 'direction', 'streetname',
'suffix', 'zipcode', 'latitude', 'longitude', 'location', 'tbmpage', 'tbmcolumn', 'tbmrow',
'apc', 'cd', 'cdmember', 'nc', 'ncname', 'policeprecinct']


def loadConfig(self, configFilePath):
Expand All @@ -29,6 +37,7 @@ def loadConfig(self, configFilePath):
config.read(configFilePath)
self.config = config
self.dbString = config['Database']['DB_CONNECTION_STRING']
self.token = None if config['Socrata']['TOKEN'] == 'None' else config['Socrata']['TOKEN']


def loadData(self, fileName="2018_mini"):
Expand Down Expand Up @@ -79,41 +88,50 @@ def loadData(self, fileName="2018_mini"):
'PolicePrecinct':str
})


def elapsedTimer(self, timeVal):
'''Simple timer method to report on elapsed time for each method'''
return (time.time() - timeVal) / 60


def cleanData(self):
'''Perform general data filtering'''
print('Cleaning 311 dataset...')
print('Cleaning data...')
cleanTimer = time.time()
data = self.data
zipIndex = (data['ZipCode'].str.isdigit()) | (data['ZipCode'].isna())
data['ZipCode'].loc[~zipIndex] = np.nan
zipIndex = (data['zipcode'].str.isdigit()) | (data['zipcode'].isna())
data['zipcode'].loc[~zipIndex] = np.nan
# Format dates as datetime (Time intensive)
data['CreatedDate'] = pd.to_datetime(data['CreatedDate'])
data['ClosedDate'] = pd.to_datetime(data['ClosedDate'])
data['ServiceDate'] = pd.to_datetime(data['ServiceDate'])
# New columns: closed_created, service_created
# xNOTE: SQLAlchemy/Postgres will convert these time deltas to integer values
# May wish to change these to a different format or compute after fact
# data['closed_created'] = data.ClosedDate-data.CreatedDate
# data['service_created'] = data.ServiceDate-data.CreatedDate
# drop NA values and reformat closed_created in units of hours
# data = data[~data.closed_created.isna()]
# New column: closed_created in units of days
# data['closed_createdD'] = data.closed_created / pd.Timedelta(days=1)
# xFUTURE: Geolocation/time clustering to weed out repeat requests
# xFUTURE: Decide whether ServiceDate or ClosedDate are primary metric
# xFUTURE: Removal of feedback and other categories
data['createddate'] = pd.to_datetime(data['createddate'])
data['closeddate'] = pd.to_datetime(data['closeddate'])
data['servicedate'] = pd.to_datetime(data['servicedate'])
data['location'] = data.location.astype(str)
# Check for column consistency
for f in self.fields:
if f not in self.data.columns:
print('\tcolumn %s missing - substituting NaN values' % f)
data[f] = np.NaN
for f in data.columns:
if f not in self.fields:
print('\tcolumn %s not in defined set - dropping column' % f)
data = data[self.fields]
# self.data = self.data.drop(f)
self.data = data
print('\tCleaning Complete: %.1f minutes' % self.elapsedTimer(cleanTimer))


def ingestData(self):
def ingestData(self, ingestMethod='replace'):
'''Set up connection to database'''
print('Inserting data into Postgres instance...')
ingestTimer = time.time()
data = self.data.copy() # shard deepcopy to allow other endpoint operations
engine = create_engine(self.dbString)
engine = db.create_engine(self.dbString)
newColumns = [column.replace(' ', '_').lower() for column in data]
data.columns = newColumns
# Ingest data
data.to_sql("ingest_staging_table",
engine,
if_exists='replace',
if_exists=ingestMethod,
schema='public',
index=False,
chunksize=10000,
Expand Down Expand Up @@ -151,9 +169,11 @@ def ingestData(self):
'nc':Float,
'ncname':String,
'policeprecinct':String})
print('\tIngest Complete: %.1f minutes' % self.elapsedTimer(ingestTimer))

def dumpCsvFile(self, dataset, startDate, requestType, councilName):
'''Output data as CSV by council name, requset type, and

def dumpFilteredCsvFile(self, dataset, startDate, requestType, councilName):
'''Output data as CSV by council name, request type, and
start date (pulls to current date). Arguments should be passed
as strings. Date values must be formatted %Y-%m-%d.'''
df = dataset.copy() # Shard deepcopy to allow multiple endpoints
Expand All @@ -166,11 +186,80 @@ def dumpCsvFile(self, dataset, startDate, requestType, councilName):
return df.to_csv()


def saveCsvFile(self, filename):
'''Save contents of self.data to CSV output'''
self.data.to_csv(filename, index=False)


def fetchSocrata(self, year=2019, querySize=10000):
'''Fetch data from Socrata connection and return pandas dataframe'''
# Load config files
socrata_domain = self.config['Socrata']['DOMAIN']
socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)]
socrata_token = self.token
# Establish connection to Socrata resource
client = Socrata(socrata_domain, socrata_token)
# Fetch data
metadata = client.get_metadata(socrata_dataset_identifier)
# Loop for querying dataset
queryDf = None
for i in range(0,querySize,1000):
print(i)
results = client.get(socrata_dataset_identifier,
offset=i,
select="*",
order="updateddate DESC")
tempDf = pd.DataFrame.from_dict(results)
if queryDf is None:
queryDf = tempDf.copy()
else:
queryDf = queryDf.append(tempDf)
self.data = queryDf
# Fetch data
metadata = client.get_metadata(socrata_dataset_identifier)


def fetchSocrataFull(self, year=2019, limit=10**7):
'''Fetch entirety of dataset via Socrata'''
# Load config files
print('Downloading %d data from Socrata data source...' % year)
downloadTimer = time.time()
socrata_domain = self.config['Socrata']['DOMAIN']
socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)]
socrata_token = self.token
# Establish connection to Socrata resource
client = Socrata(socrata_domain, socrata_token)
results = client.get(socrata_dataset_identifier, limit=limit)
self.data = pd.DataFrame.from_dict(results)
print('\tDownload Complete: %.1f minutes' % self.elapsedTimer(downloadTimer))


def populateFullDatabase(self, yearRange=range(2015,2021)):
'''Fetches all data from Socrata to populate database
Default operation is to fetch data from 2015-2020
!!! Be aware that each fresh import will wipe the
existing staging table'''
print('Performing fresh Postgres repopulation from Socrata data sources')
tableInit = False
globalTimer = time.time()
for y in yearRange:
self.fetchSocrataFull(year=y)
self.cleanData()
if not tableInit:
self.ingestData(ingestMethod='replace')
tableInit = True
else:
self.ingestData(ingestMethod='append')
print('All Operations Complete: %.1f minutes' % self.elapsedTimer(globalTimer))


if __name__ == "__main__":
'''Class DataHandler workflow from initial load to SQL population'''
loader = DataHandler()
loader.loadConfig(configFilePath='../settings.cfg')
loader.loadData()
loader.fetchSocrataFull()
loader.cleanData()
loader.ingestData()
loader.dumpCsvFile(dataset="", startDate='2018-05-01', requestType='Bulky Items', councilName='VOICES OF 90037')
loader.saveCsvFile('testfile.csv')
loader.dumpFilteredCsvFile(dataset="", startDate='2018-05-01', requestType='Bulky Items', councilName='VOICES OF 90037')

6 changes: 2 additions & 4 deletions server/src/settings.example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ DATA_DIRECTORY = static
[Api]
REDACTED = REDACTED

[YearMapping]
2018_MINI = 2018_mini
2018_FULL = 311data

[Socrata]
TOKEN = None
DOMAIN = data.lacity.org
AP2020 = rq3b-xjk8
AP2019 = pvft-t768
AP2018 = h65r-yf5i
AP2017 = d4vt-q4t5
Expand Down