From 91c73314cd455609a007245af186cb3db83a454d Mon Sep 17 00:00:00 2001 From: sellnat77 Date: Tue, 18 Feb 2020 23:22:09 -0800 Subject: [PATCH] Fixes #265 Added pinService and started data layer abstraction --- server/src/app.py | 30 +++++++++++++++---- server/src/services/dataService.py | 39 ++++++++++++++++++++++++ server/src/services/pinService.py | 48 ++++++++++++++++++++++++++++++ server/src/services/sqlIngest.py | 21 ++++++------- 4 files changed, 120 insertions(+), 18 deletions(-) create mode 100644 server/src/services/dataService.py create mode 100644 server/src/services/pinService.py diff --git a/server/src/app.py b/server/src/app.py index b630c9394..537b4dc8f 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -1,15 +1,16 @@ import os from sanic import Sanic from sanic.response import json -from services.time_to_close import time_to_close -from services.frequency import frequency -from services.ingress_service import ingress_service from configparser import ConfigParser from threading import Timer -from multiprocessing import cpu_count -from services.sqlIngest import DataHandler from datetime import datetime +from multiprocessing import cpu_count +from services.time_to_close import time_to_close +from services.frequency import frequency +from services.pinService import PinService +from services.ingress_service import ingress_service +from services.sqlIngest import DataHandler app = Sanic(__name__) @@ -77,7 +78,8 @@ async def ingest(request): return json({"error": "'years' parameter is required."}) years = set([int(year) for year in request.args.get("years").split(",")]) if not all(year in ALLOWED_YEARS for year in years): - return json({"error": f"'years' parameter values must be one of {ALLOWED_YEARS}"}) + return json({"error": + f"'years' param values must be one of {ALLOWED_YEARS}"}) loader = DataHandler() loader.loadConfig(configFilePath='./settings.cfg') loader.populateFullDatabase(yearRange=years) @@ -99,6 +101,22 @@ async def delete(request): return json(return_data) +@app.route('/pins', methods=["POST"]) +async def pinMap(request): + pin_worker = PinService(app.config['Settings']) + postArgs = request.json + start = postArgs.get('startDate', '2015-01-01') + end = postArgs.get('endDate', '2015-12-31 01:01:01') + ncs = postArgs.get('ncList', ['SHERMAN OAKS NC']) + requests = postArgs.get('requestTypes', ['Bulky Items']) + + return_data = await pin_worker.get_base_pins(startDate=start, + endDate=end, + ncList=ncs, + requestTypes=requests) + return json(return_data) + + @app.route('/test_multiple_workers') async def test_multiple_workers(request): Timer(10.0, print, ["Timer Test."]).start() diff --git a/server/src/services/dataService.py b/server/src/services/dataService.py new file mode 100644 index 000000000..df42ac763 --- /dev/null +++ b/server/src/services/dataService.py @@ -0,0 +1,39 @@ +import sqlalchemy as db +import pandas as pd + + +class DataService(object): + def includeMeta(func): + def inner1(*args, **kwargs): + dataResponse = func(*args, **kwargs) + + withMeta = {'lastPulled': 'NOW', 'data': dataResponse} + print(withMeta) + return withMeta + + return inner1 + + def __init__(self, config=None, tableName="ingest_staging_table"): + self.config = config + self.dbString = None if not self.config \ + else self.config['Database']['DB_CONNECTION_STRING'] + + self.table = tableName + self.data = None + self.engine = db.create_engine(self.dbString) + + @includeMeta + def query(self, queryItems=[], queryfilters=[], limit=None): + items = ', '.join(queryItems) + query = 'SELECT {} FROM {}'.format(items, self.table) + if queryfilters: + filters = ' AND '.join(queryfilters) + query += ' WHERE {}'.format(filters) + if limit: + query += ' LIMIT {}'.format(limit) + + df = pd.read_sql_query(query, con=self.engine) + return df.to_dict(orient='records') + + def storedProc(self): + pass diff --git a/server/src/services/pinService.py b/server/src/services/pinService.py new file mode 100644 index 000000000..8938ba488 --- /dev/null +++ b/server/src/services/pinService.py @@ -0,0 +1,48 @@ +from .dataService import DataService + + +class PinService(object): + def __init__(self, config=None, tableName="ingest_staging_table"): + self.dataAccess = DataService(config, tableName) + + async def get_base_pins(self, + startDate='', + endDate='', + ncList=[], + requestTypes=[]): + """ + Returns the base pin data given times, ncs, and request filters + { + 'LastPulled': 'Timestamp', + 'data': [ + { + 'ncname':'String', + 'requesttype':'String', + 'srnumber':'String', + 'latitude': 'String', + 'longitude': 'String', + 'address': 'String', + 'createddate': 'Timestamp' + } + ] + } + """ + + items = ['ncname', + 'requesttype', + 'srnumber', + 'latitude', + 'longitude', + 'address', + 'createddate'] + + ncs = '\'' + '\', \''.join(ncList) + '\'' + requests = '\'' + '\', \''.join(requestTypes) + '\'' + + filters = ['createddate > \'{}\''.format(startDate), + 'createddate < \'{}\''.format(endDate), + 'ncname IN ({})'.format(ncs), + 'requesttype IN ({})'.format(requests)] + result = self.dataAccess.query(items, filters) + + return result diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 24e89dc55..08cc9a155 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -1,14 +1,11 @@ import os -import sqlalchemy as db -import pandas as pd -from configparser import ConfigParser +import time import numpy as np +import pandas as pd +import sqlalchemy as db from sodapy import Socrata -import time -if __name__ == '__main__': - import databaseOrm # Contains database specs and field definitions -else: - from . import databaseOrm # Contains database specs and field definitions +from .databaseOrm import tableFields, insertFields, readFields # Contains db specs and field definitions +from configparser import ConfigParser class DataHandler: @@ -20,9 +17,9 @@ def __init__(self, config=None, configFilePath=None, separator=','): self.filePath = None self.configFilePath = configFilePath self.separator = separator - self.fields = databaseOrm.tableFields - self.insertParams = databaseOrm.insertFields - self.readParams = databaseOrm.readFields + self.fields = tableFields + self.insertParams = insertFields + self.readParams = readFields self.dialect = None def loadConfig(self, configFilePath): @@ -180,7 +177,7 @@ def populateFullDatabase(self, yearRange=range(2015, 2021)): Default operation is to fetch data from 2015-2020 !!! Be aware that each fresh import will wipe the existing staging table''' - print('Performing fresh ' + self.dialect + ' population from Socrata data sources') + print('Performing {} population from data source'.format(self.dialect)) tableInit = False globalTimer = time.time() for y in yearRange: