Skip to content

Commit

Permalink
Merge pull request #266 from hackforla/265_BE_Pinmap
Browse files Browse the repository at this point in the history
Fixes #265 Added pinService and started data layer abstraction
  • Loading branch information
sellnat77 authored Feb 19, 2020
2 parents f18d88e + 91c7331 commit 661dd00
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 18 deletions.
30 changes: 24 additions & 6 deletions server/src/app.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import os
from sanic import Sanic
from sanic.response import json
from services.time_to_close import time_to_close
from services.frequency import frequency
from services.ingress_service import ingress_service
from configparser import ConfigParser
from threading import Timer
from multiprocessing import cpu_count
from services.sqlIngest import DataHandler
from datetime import datetime
from multiprocessing import cpu_count

from services.time_to_close import time_to_close
from services.frequency import frequency
from services.pinService import PinService
from services.ingress_service import ingress_service
from services.sqlIngest import DataHandler

app = Sanic(__name__)

Expand Down Expand Up @@ -77,7 +78,8 @@ async def ingest(request):
return json({"error": "'years' parameter is required."})
years = set([int(year) for year in request.args.get("years").split(",")])
if not all(year in ALLOWED_YEARS for year in years):
return json({"error": f"'years' parameter values must be one of {ALLOWED_YEARS}"})
return json({"error":
f"'years' param values must be one of {ALLOWED_YEARS}"})
loader = DataHandler()
loader.loadConfig(configFilePath='./settings.cfg')
loader.populateFullDatabase(yearRange=years)
Expand All @@ -99,6 +101,22 @@ async def delete(request):
return json(return_data)


@app.route('/pins', methods=["POST"])
async def pinMap(request):
pin_worker = PinService(app.config['Settings'])
postArgs = request.json
start = postArgs.get('startDate', '2015-01-01')
end = postArgs.get('endDate', '2015-12-31 01:01:01')
ncs = postArgs.get('ncList', ['SHERMAN OAKS NC'])
requests = postArgs.get('requestTypes', ['Bulky Items'])

return_data = await pin_worker.get_base_pins(startDate=start,
endDate=end,
ncList=ncs,
requestTypes=requests)
return json(return_data)


@app.route('/test_multiple_workers')
async def test_multiple_workers(request):
Timer(10.0, print, ["Timer Test."]).start()
Expand Down
39 changes: 39 additions & 0 deletions server/src/services/dataService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import sqlalchemy as db
import pandas as pd


class DataService(object):
def includeMeta(func):
def inner1(*args, **kwargs):
dataResponse = func(*args, **kwargs)

withMeta = {'lastPulled': 'NOW', 'data': dataResponse}
print(withMeta)
return withMeta

return inner1

def __init__(self, config=None, tableName="ingest_staging_table"):
self.config = config
self.dbString = None if not self.config \
else self.config['Database']['DB_CONNECTION_STRING']

self.table = tableName
self.data = None
self.engine = db.create_engine(self.dbString)

@includeMeta
def query(self, queryItems=[], queryfilters=[], limit=None):
items = ', '.join(queryItems)
query = 'SELECT {} FROM {}'.format(items, self.table)
if queryfilters:
filters = ' AND '.join(queryfilters)
query += ' WHERE {}'.format(filters)
if limit:
query += ' LIMIT {}'.format(limit)

df = pd.read_sql_query(query, con=self.engine)
return df.to_dict(orient='records')

def storedProc(self):
pass
48 changes: 48 additions & 0 deletions server/src/services/pinService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from .dataService import DataService


class PinService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)

async def get_base_pins(self,
startDate='',
endDate='',
ncList=[],
requestTypes=[]):
"""
Returns the base pin data given times, ncs, and request filters
{
'LastPulled': 'Timestamp',
'data': [
{
'ncname':'String',
'requesttype':'String',
'srnumber':'String',
'latitude': 'String',
'longitude': 'String',
'address': 'String',
'createddate': 'Timestamp'
}
]
}
"""

items = ['ncname',
'requesttype',
'srnumber',
'latitude',
'longitude',
'address',
'createddate']

ncs = '\'' + '\', \''.join(ncList) + '\''
requests = '\'' + '\', \''.join(requestTypes) + '\''

filters = ['createddate > \'{}\''.format(startDate),
'createddate < \'{}\''.format(endDate),
'ncname IN ({})'.format(ncs),
'requesttype IN ({})'.format(requests)]
result = self.dataAccess.query(items, filters)

return result
21 changes: 9 additions & 12 deletions server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import os
import sqlalchemy as db
import pandas as pd
from configparser import ConfigParser
import time
import numpy as np
import pandas as pd
import sqlalchemy as db
from sodapy import Socrata
import time
if __name__ == '__main__':
import databaseOrm # Contains database specs and field definitions
else:
from . import databaseOrm # Contains database specs and field definitions
from .databaseOrm import tableFields, insertFields, readFields # Contains db specs and field definitions
from configparser import ConfigParser


class DataHandler:
Expand All @@ -20,9 +17,9 @@ def __init__(self, config=None, configFilePath=None, separator=','):
self.filePath = None
self.configFilePath = configFilePath
self.separator = separator
self.fields = databaseOrm.tableFields
self.insertParams = databaseOrm.insertFields
self.readParams = databaseOrm.readFields
self.fields = tableFields
self.insertParams = insertFields
self.readParams = readFields
self.dialect = None

def loadConfig(self, configFilePath):
Expand Down Expand Up @@ -180,7 +177,7 @@ def populateFullDatabase(self, yearRange=range(2015, 2021)):
Default operation is to fetch data from 2015-2020
!!! Be aware that each fresh import will wipe the
existing staging table'''
print('Performing fresh ' + self.dialect + ' population from Socrata data sources')
print('Performing {} population from data source'.format(self.dialect))
tableInit = False
globalTimer = time.time()
for y in yearRange:
Expand Down

0 comments on commit 661dd00

Please sign in to comment.