Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #265 Added pinService and started data layer abstraction #266

Merged
merged 1 commit into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions server/src/app.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import os
from sanic import Sanic
from sanic.response import json
from services.time_to_close import time_to_close
from services.frequency import frequency
from services.ingress_service import ingress_service
from configparser import ConfigParser
from threading import Timer
from multiprocessing import cpu_count
from services.sqlIngest import DataHandler
from datetime import datetime
from multiprocessing import cpu_count

from services.time_to_close import time_to_close
from services.frequency import frequency
from services.pinService import PinService
from services.ingress_service import ingress_service
from services.sqlIngest import DataHandler

app = Sanic(__name__)

Expand Down Expand Up @@ -77,7 +78,8 @@ async def ingest(request):
return json({"error": "'years' parameter is required."})
years = set([int(year) for year in request.args.get("years").split(",")])
if not all(year in ALLOWED_YEARS for year in years):
return json({"error": f"'years' parameter values must be one of {ALLOWED_YEARS}"})
return json({"error":
f"'years' param values must be one of {ALLOWED_YEARS}"})
loader = DataHandler()
loader.loadConfig(configFilePath='./settings.cfg')
loader.populateFullDatabase(yearRange=years)
Expand All @@ -99,6 +101,22 @@ async def delete(request):
return json(return_data)


@app.route('/pins', methods=["POST"])
async def pinMap(request):
pin_worker = PinService(app.config['Settings'])
postArgs = request.json
start = postArgs.get('startDate', '2015-01-01')
end = postArgs.get('endDate', '2015-12-31 01:01:01')
ncs = postArgs.get('ncList', ['SHERMAN OAKS NC'])
requests = postArgs.get('requestTypes', ['Bulky Items'])

return_data = await pin_worker.get_base_pins(startDate=start,
endDate=end,
ncList=ncs,
requestTypes=requests)
return json(return_data)


@app.route('/test_multiple_workers')
async def test_multiple_workers(request):
Timer(10.0, print, ["Timer Test."]).start()
Expand Down
39 changes: 39 additions & 0 deletions server/src/services/dataService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sqlalchemy as db
import pandas as pd


class DataService(object):
def includeMeta(func):
def inner1(*args, **kwargs):
dataResponse = func(*args, **kwargs)

withMeta = {'lastPulled': 'NOW', 'data': dataResponse}
print(withMeta)
return withMeta

return inner1

def __init__(self, config=None, tableName="ingest_staging_table"):
self.config = config
self.dbString = None if not self.config \
else self.config['Database']['DB_CONNECTION_STRING']

self.table = tableName
self.data = None
self.engine = db.create_engine(self.dbString)

@includeMeta
def query(self, queryItems=[], queryfilters=[], limit=None):
items = ', '.join(queryItems)
query = 'SELECT {} FROM {}'.format(items, self.table)
if queryfilters:
filters = ' AND '.join(queryfilters)
query += ' WHERE {}'.format(filters)
if limit:
query += ' LIMIT {}'.format(limit)

df = pd.read_sql_query(query, con=self.engine)
return df.to_dict(orient='records')

def storedProc(self):
pass
48 changes: 48 additions & 0 deletions server/src/services/pinService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from .dataService import DataService


class PinService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)

async def get_base_pins(self,
startDate='',
endDate='',
ncList=[],
requestTypes=[]):
"""
Returns the base pin data given times, ncs, and request filters
{
'LastPulled': 'Timestamp',
'data': [
{
'ncname':'String',
'requesttype':'String',
'srnumber':'String',
'latitude': 'String',
'longitude': 'String',
'address': 'String',
'createddate': 'Timestamp'
}
]
}
"""

items = ['ncname',
'requesttype',
'srnumber',
'latitude',
'longitude',
'address',
'createddate']

ncs = '\'' + '\', \''.join(ncList) + '\''
requests = '\'' + '\', \''.join(requestTypes) + '\''

filters = ['createddate > \'{}\''.format(startDate),
'createddate < \'{}\''.format(endDate),
'ncname IN ({})'.format(ncs),
'requesttype IN ({})'.format(requests)]
result = self.dataAccess.query(items, filters)

return result
21 changes: 9 additions & 12 deletions server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import os
import sqlalchemy as db
import pandas as pd
from configparser import ConfigParser
import time
import numpy as np
import pandas as pd
import sqlalchemy as db
from sodapy import Socrata
import time
if __name__ == '__main__':
import databaseOrm # Contains database specs and field definitions
else:
from . import databaseOrm # Contains database specs and field definitions
from .databaseOrm import tableFields, insertFields, readFields # Contains db specs and field definitions
from configparser import ConfigParser


class DataHandler:
Expand All @@ -20,9 +17,9 @@ def __init__(self, config=None, configFilePath=None, separator=','):
self.filePath = None
self.configFilePath = configFilePath
self.separator = separator
self.fields = databaseOrm.tableFields
self.insertParams = databaseOrm.insertFields
self.readParams = databaseOrm.readFields
self.fields = tableFields
self.insertParams = insertFields
self.readParams = readFields
self.dialect = None

def loadConfig(self, configFilePath):
Expand Down Expand Up @@ -180,7 +177,7 @@ def populateFullDatabase(self, yearRange=range(2015, 2021)):
Default operation is to fetch data from 2015-2020
!!! Be aware that each fresh import will wipe the
existing staging table'''
print('Performing fresh ' + self.dialect + ' population from Socrata data sources')
print('Performing {} population from data source'.format(self.dialect))
tableInit = False
globalTimer = time.time()
for y in yearRange:
Expand Down