Skip to content

Commit

Permalink
Merge pull request #177 from ryanmswan/dev
Browse files Browse the repository at this point in the history
SQL Ingestion functionality updates
  • Loading branch information
sellnat77 authored Jan 15, 2020
2 parents 418f9f9 + 144c3f5 commit 1b13543
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 29 deletions.
139 changes: 114 additions & 25 deletions server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
from sqlalchemy.types import Integer, Text, String, DateTime, Float
from sqlalchemy import create_engine
import sqlalchemy as db
import pandas as pd
from configparser import ConfigParser # For configparser compatible formatting see: https://docs.python.org/3/library/configparser.html
import numpy as np
import logging
import io
from sodapy import Socrata
import time

class DataHandler:
def __init__(self, config=None, configFilePath=None, separator=','):
Expand All @@ -15,6 +18,11 @@ def __init__(self, config=None, configFilePath=None, separator=','):
self.filePath = None
self.configFilePath = configFilePath
self.separator = separator
self.fields = ['srnumber', 'createddate', 'updateddate', 'actiontaken', 'owner', 'requesttype', 'status', 'requestsource',
'createdbyuserorganization', 'mobileos', 'anonymous', 'assignto', 'servicedate', 'closeddate',
'addressverified', 'approximateaddress', 'address', 'housenumber', 'direction', 'streetname',
'suffix', 'zipcode', 'latitude', 'longitude', 'location', 'tbmpage', 'tbmcolumn', 'tbmrow',
'apc', 'cd', 'cdmember', 'nc', 'ncname', 'policeprecinct']


def loadConfig(self, configFilePath):
Expand All @@ -29,6 +37,7 @@ def loadConfig(self, configFilePath):
config.read(configFilePath)
self.config = config
self.dbString = config['Database']['DB_CONNECTION_STRING']
self.token = None if config['Socrata']['TOKEN'] == 'None' else config['Socrata']['TOKEN']


def loadData(self, fileName="2018_mini"):
Expand Down Expand Up @@ -79,41 +88,50 @@ def loadData(self, fileName="2018_mini"):
'PolicePrecinct':str
})


def elapsedTimer(self, timeVal):
'''Simple timer method to report on elapsed time for each method'''
return (time.time() - timeVal) / 60


def cleanData(self):
'''Perform general data filtering'''
print('Cleaning 311 dataset...')
print('Cleaning data...')
cleanTimer = time.time()
data = self.data
zipIndex = (data['ZipCode'].str.isdigit()) | (data['ZipCode'].isna())
data['ZipCode'].loc[~zipIndex] = np.nan
zipIndex = (data['zipcode'].str.isdigit()) | (data['zipcode'].isna())
data['zipcode'].loc[~zipIndex] = np.nan
# Format dates as datetime (Time intensive)
data['CreatedDate'] = pd.to_datetime(data['CreatedDate'])
data['ClosedDate'] = pd.to_datetime(data['ClosedDate'])
data['ServiceDate'] = pd.to_datetime(data['ServiceDate'])
# New columns: closed_created, service_created
# xNOTE: SQLAlchemy/Postgres will convert these time deltas to integer values
# May wish to change these to a different format or compute after fact
# data['closed_created'] = data.ClosedDate-data.CreatedDate
# data['service_created'] = data.ServiceDate-data.CreatedDate
# drop NA values and reformat closed_created in units of hours
# data = data[~data.closed_created.isna()]
# New column: closed_created in units of days
# data['closed_createdD'] = data.closed_created / pd.Timedelta(days=1)
# xFUTURE: Geolocation/time clustering to weed out repeat requests
# xFUTURE: Decide whether ServiceDate or ClosedDate are primary metric
# xFUTURE: Removal of feedback and other categories
data['createddate'] = pd.to_datetime(data['createddate'])
data['closeddate'] = pd.to_datetime(data['closeddate'])
data['servicedate'] = pd.to_datetime(data['servicedate'])
data['location'] = data.location.astype(str)
# Check for column consistency
for f in self.fields:
if f not in self.data.columns:
print('\tcolumn %s missing - substituting NaN values' % f)
data[f] = np.NaN
for f in data.columns:
if f not in self.fields:
print('\tcolumn %s not in defined set - dropping column' % f)
data = data[self.fields]
# self.data = self.data.drop(f)
self.data = data
print('\tCleaning Complete: %.1f minutes' % self.elapsedTimer(cleanTimer))


def ingestData(self):
def ingestData(self, ingestMethod='replace'):
'''Set up connection to database'''
print('Inserting data into Postgres instance...')
ingestTimer = time.time()
data = self.data.copy() # shard deepcopy to allow other endpoint operations
engine = create_engine(self.dbString)
engine = db.create_engine(self.dbString)
newColumns = [column.replace(' ', '_').lower() for column in data]
data.columns = newColumns
# Ingest data
data.to_sql("ingest_staging_table",
engine,
if_exists='replace',
if_exists=ingestMethod,
schema='public',
index=False,
chunksize=10000,
Expand Down Expand Up @@ -151,9 +169,11 @@ def ingestData(self):
'nc':Float,
'ncname':String,
'policeprecinct':String})
print('\tIngest Complete: %.1f minutes' % self.elapsedTimer(ingestTimer))

def dumpCsvFile(self, dataset, startDate, requestType, councilName):
'''Output data as CSV by council name, requset type, and

def dumpFilteredCsvFile(self, dataset, startDate, requestType, councilName):
'''Output data as CSV by council name, request type, and
start date (pulls to current date). Arguments should be passed
as strings. Date values must be formatted %Y-%m-%d.'''
df = dataset.copy() # Shard deepcopy to allow multiple endpoints
Expand All @@ -166,11 +186,80 @@ def dumpCsvFile(self, dataset, startDate, requestType, councilName):
return df.to_csv()


def saveCsvFile(self, filename):
'''Save contents of self.data to CSV output'''
self.data.to_csv(filename, index=False)


def fetchSocrata(self, year=2019, querySize=10000):
'''Fetch data from Socrata connection and return pandas dataframe'''
# Load config files
socrata_domain = self.config['Socrata']['DOMAIN']
socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)]
socrata_token = self.token
# Establish connection to Socrata resource
client = Socrata(socrata_domain, socrata_token)
# Fetch data
metadata = client.get_metadata(socrata_dataset_identifier)
# Loop for querying dataset
queryDf = None
for i in range(0,querySize,1000):
print(i)
results = client.get(socrata_dataset_identifier,
offset=i,
select="*",
order="updateddate DESC")
tempDf = pd.DataFrame.from_dict(results)
if queryDf is None:
queryDf = tempDf.copy()
else:
queryDf = queryDf.append(tempDf)
self.data = queryDf
# Fetch data
metadata = client.get_metadata(socrata_dataset_identifier)


def fetchSocrataFull(self, year=2019, limit=10**7):
'''Fetch entirety of dataset via Socrata'''
# Load config files
print('Downloading %d data from Socrata data source...' % year)
downloadTimer = time.time()
socrata_domain = self.config['Socrata']['DOMAIN']
socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)]
socrata_token = self.token
# Establish connection to Socrata resource
client = Socrata(socrata_domain, socrata_token)
results = client.get(socrata_dataset_identifier, limit=limit)
self.data = pd.DataFrame.from_dict(results)
print('\tDownload Complete: %.1f minutes' % self.elapsedTimer(downloadTimer))


def populateFullDatabase(self, yearRange=range(2015,2021)):
'''Fetches all data from Socrata to populate database
Default operation is to fetch data from 2015-2020
!!! Be aware that each fresh import will wipe the
existing staging table'''
print('Performing fresh Postgres repopulation from Socrata data sources')
tableInit = False
globalTimer = time.time()
for y in yearRange:
self.fetchSocrataFull(year=y)
self.cleanData()
if not tableInit:
self.ingestData(ingestMethod='replace')
tableInit = True
else:
self.ingestData(ingestMethod='append')
print('All Operations Complete: %.1f minutes' % self.elapsedTimer(globalTimer))


if __name__ == "__main__":
'''Class DataHandler workflow from initial load to SQL population'''
loader = DataHandler()
loader.loadConfig(configFilePath='../settings.cfg')
loader.loadData()
loader.fetchSocrataFull()
loader.cleanData()
loader.ingestData()
loader.dumpCsvFile(dataset="", startDate='2018-05-01', requestType='Bulky Items', councilName='VOICES OF 90037')
loader.saveCsvFile('testfile.csv')
loader.dumpFilteredCsvFile(dataset="", startDate='2018-05-01', requestType='Bulky Items', councilName='VOICES OF 90037')

6 changes: 2 additions & 4 deletions server/src/settings.example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ DATA_DIRECTORY = static
[Api]
REDACTED = REDACTED

[YearMapping]
2018_MINI = 2018_mini
2018_FULL = 311data

[Socrata]
TOKEN = None
DOMAIN = data.lacity.org
AP2020 = rq3b-xjk8
AP2019 = pvft-t768
AP2018 = h65r-yf5i
AP2017 = d4vt-q4t5
Expand Down

0 comments on commit 1b13543

Please sign in to comment.