From 84873ec47dd3473fa6961efed110f058c1b9dab4 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Sun, 8 Mar 2020 07:54:45 -0700 Subject: [PATCH 1/6] added requestcounts endpoint --- server/src/app.py | 20 ++++++ server/src/services/dataService.py | 13 ++-- server/src/services/requestCountsService.py | 70 +++++++++++++++++++++ 3 files changed, 98 insertions(+), 5 deletions(-) create mode 100644 server/src/services/requestCountsService.py diff --git a/server/src/app.py b/server/src/app.py index 3bd8111bd..3421c3ea2 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -11,6 +11,7 @@ from services.time_to_close import time_to_close from services.frequency import frequency from services.pinService import PinService +from services.requestCountsService import RequestCountsService from services.requestDetailService import RequestDetailService from services.ingress_service import ingress_service from services.sqlIngest import DataHandler @@ -140,6 +141,25 @@ async def pinMap(request): return json(return_data) +@app.route('/requestcounts', methods=["POST"]) +@compress.compress() +async def requestCounts(request): + counts_worker = RequestCountsService(app.config['Settings']) + postArgs = request.json + start = postArgs.get('startDate', None) + end = postArgs.get('endDate', None) + ncs = postArgs.get('ncList', []) + requests = postArgs.get('requestTypes', []) + countFields = postArgs.get('countFields', []) + + return_data = await counts_worker.get_req_counts(startDate=start, + endDate=end, + ncList=ncs, + requestTypes=requests, + countFields=countFields) + return json(return_data) + + @app.route('/servicerequest/', methods=["GET"]) async def requestDetails(request, srnumber): detail_worker = RequestDetailService(app.config['Settings']) diff --git a/server/src/services/dataService.py b/server/src/services/dataService.py index ddf364e4b..658cff837 100644 --- a/server/src/services/dataService.py +++ b/server/src/services/dataService.py @@ -4,17 +4,19 @@ class DataService(object): + def withMeta(dataResponse): + # Will represent last time the ingest pipeline ran + lastPulledTimestamp = datetime.datetime.utcnow() + return {'lastPulled': lastPulledTimestamp, + 'data': dataResponse} + def includeMeta(func): def innerFunc(*args, **kwargs): dataResponse = func(*args, **kwargs) if 'Error' in dataResponse: return dataResponse - # Will represent last time the ingest pipeline ran - lastPulledTimestamp = datetime.datetime.utcnow() - withMeta = {'lastPulled': lastPulledTimestamp, - 'data': dataResponse} - return withMeta + return DataService.withMeta(dataResponse) return innerFunc @@ -26,6 +28,7 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.table = tableName self.data = None self.engine = db.create_engine(self.dbString) + self.session = db.orm.sessionmaker(bind=self.engine)() @includeMeta def query(self, queryItems=None, queryfilters=[], limit=None): diff --git a/server/src/services/requestCountsService.py b/server/src/services/requestCountsService.py new file mode 100644 index 000000000..801da309c --- /dev/null +++ b/server/src/services/requestCountsService.py @@ -0,0 +1,70 @@ +from sqlalchemy import func +from .dataService import DataService +from .databaseOrm import Ingest as Request + + +class RequestCountsService(object): + def __init__(self, config=None, tableName="ingest_staging_table"): + self.dataAccess = DataService(config, tableName) + + async def get_req_counts(self, + startDate=None, + endDate=None, + ncList=[], + requestTypes=[], + countFields=[]): + """ + For each countField, returns the counts of each distinct value + in that field, given times, ncs, and request filters. + E.g. if countsFields is ['requesttype', 'requestsource'], returns: + { + 'lastPulled': 'Timestamp', + 'data': [ + { + 'field': 'requesttype', + 'counts': { + 'Graffiti Removal': 'Int', + 'Bulky Items': 'Int', + ... + } + }, + { + 'field': 'requestsource', + 'counts': { + 'Mobile App': 'Int', + 'Driver Self Report': 'Int', + ... + } + } + ] + } + """ + + # filter by date, nc, and requestType (if provided) + filters = [ + Request.createddate > startDate if startDate else True, + Request.createddate < endDate if endDate else True, + Request.ncname.in_(ncList) if ncList else True, + Request.requesttype.in_(requestTypes) if requestTypes else True + ] + + data = [] + for field in countFields: + # make sure the field exists in the Request model + if not getattr(Request, field, None): + continue + + # run count/groupby query + results = self.dataAccess.session \ + .query(field, func.count()) \ + .filter(*filters) \ + .group_by(field) \ + .all() + + # add results to data set + data.append({ + 'field': field, + 'counts': dict(results) + }) + + return DataService.withMeta(data) From 8b675a368322a5258696c0ef688308ce006ae984 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Sun, 8 Mar 2020 17:50:46 -0700 Subject: [PATCH 2/6] changed sqlalchemy.orm import to avoid pytest error --- server/src/services/dataService.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/services/dataService.py b/server/src/services/dataService.py index 658cff837..c0ebbd3b0 100644 --- a/server/src/services/dataService.py +++ b/server/src/services/dataService.py @@ -1,6 +1,7 @@ import datetime import pandas as pd import sqlalchemy as db +from sqlalchemy.orm import sessionmaker class DataService(object): @@ -28,7 +29,7 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.table = tableName self.data = None self.engine = db.create_engine(self.dbString) - self.session = db.orm.sessionmaker(bind=self.engine)() + self.session = sessionmaker(bind=self.engine)() @includeMeta def query(self, queryItems=None, queryfilters=[], limit=None): From 3d5d998ac99cda703af9105487889a93bbb24a0b Mon Sep 17 00:00:00 2001 From: sellnat77 Date: Sun, 8 Mar 2020 17:52:15 -0700 Subject: [PATCH 3/6] Sqlingest was parsing querySize and limit as strings if provided, parse as int before use --- server/src/services/sqlIngest.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 8553ddf26..2cd31610a 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -139,15 +139,17 @@ def fetchSocrata(self, # Fetch data # Loop for querying dataset tableInit = False - for i in range(0, totalRequestRecords, querySize): + query = int(querySize) + maxRecords = int(totalRequestRecords) + for i in range(0, maxRecords, query): fetchTimer = time.time() print('Fetching %d records with offset %d up to a max of %d' - % (querySize, i, totalRequestRecords)) + % (query, i, maxRecords)) results = client.get(socrata_dataset_identifier, offset=i, select="*", order="updateddate DESC", - limit=querySize) + limit=query) if not results: break tempDf = pd.DataFrame.from_dict(results) From a76ef2564607cc39f2557280e37ab431910de083 Mon Sep 17 00:00:00 2001 From: sellnat77 Date: Sun, 8 Mar 2020 17:56:54 -0700 Subject: [PATCH 4/6] Added pull request template --- .github/PULL_REQUEST_TEMPLATE.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE.md diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 000000000..739b02194 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,8 @@ +Fixes #{issue number here} + + - [ ] Up to date with `dev` branch + - [ ] Branch name follows [guidelines](https://github.com/hackforla/311-data/blob/master/GETTING_STARTED.md#feature-branching) + - [ ] All PR Status checks are successful + - [ ] Peer reviewed and approved + +Any questions? See the [getting started guide](https://github.com/hackforla/311-data/blob/master/GETTING_STARTED.md) From 02def3ea5813e31e4b9867f38696b38577c0dfbd Mon Sep 17 00:00:00 2001 From: sellnat77 Date: Sun, 8 Mar 2020 18:28:37 -0700 Subject: [PATCH 5/6] Allows socrata token override when starting docker image --- server/Dockerfile | 1 + server/src/app.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/server/Dockerfile b/server/Dockerfile index e97570ca4..4c322d588 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -6,6 +6,7 @@ RUN apt-get update && apt-get install -yq \ ENV DB_CONNECTION_STRING=REDACTED ENV PORT=5000 +ENV TOKEN=None COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/server/src/app.py b/server/src/app.py index 3bd8111bd..4027c1f35 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -27,6 +27,9 @@ def environment_overrides(): if os.environ.get('PORT', None): app.config['Settings']['Server']['PORT'] =\ os.environ.get('PORT') + if os.environ.get('TOKEN', None): + app.config['Settings']['Socrata']['TOKEN'] =\ + os.environ.get('TOKEN') def configure_app(): From ce85b568e0fdbcab31c7926339e389cf65f18950 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Mon, 9 Mar 2020 14:46:20 -0700 Subject: [PATCH 6/6] created aggregationQuery in dataService; using ORM for constructing SQL queries --- server/src/services/dataService.py | 85 ++++++++++++++++----- server/src/services/databaseOrm.py | 11 ++- server/src/services/pinService.py | 16 ++-- server/src/services/requestCountsService.py | 32 +------- server/src/services/requestDetailService.py | 6 +- 5 files changed, 86 insertions(+), 64 deletions(-) diff --git a/server/src/services/dataService.py b/server/src/services/dataService.py index c0ebbd3b0..8749fa8fe 100644 --- a/server/src/services/dataService.py +++ b/server/src/services/dataService.py @@ -2,22 +2,20 @@ import pandas as pd import sqlalchemy as db from sqlalchemy.orm import sessionmaker +from .databaseOrm import Ingest as Request class DataService(object): - def withMeta(dataResponse): - # Will represent last time the ingest pipeline ran - lastPulledTimestamp = datetime.datetime.utcnow() - return {'lastPulled': lastPulledTimestamp, - 'data': dataResponse} - def includeMeta(func): def innerFunc(*args, **kwargs): dataResponse = func(*args, **kwargs) if 'Error' in dataResponse: return dataResponse - - return DataService.withMeta(dataResponse) + # Will represent last time the ingest pipeline ran + lastPulledTimestamp = datetime.datetime.utcnow() + withMeta = {'lastPulled': lastPulledTimestamp, + 'data': dataResponse} + return withMeta return innerFunc @@ -31,21 +29,72 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.engine = db.create_engine(self.dbString) self.session = sessionmaker(bind=self.engine)() + def standardFilters(self, + startDate=None, + endDate=None, + ncList=[], + requestTypes=[]): + ''' + Generates filters for dates, ncs, and request types. + ''' + + return [ + Request.createddate > startDate if startDate else True, + Request.createddate < endDate if endDate else True, + Request.ncname.in_(ncList) if ncList else True, + Request.requesttype.in_(requestTypes) if requestTypes else True + ] + + @includeMeta + def itemQuery(self, requestNumber): + ''' + Returns a single request by its requestNumber. + ''' + + if not requestNumber or not isinstance(requestNumber, str): + return {'Error': 'Missing request number'} + + return self.session \ + .query(Request) \ + .get(requestNumber) \ + ._asdict() + @includeMeta - def query(self, queryItems=None, queryfilters=[], limit=None): + def query(self, queryItems=[], queryFilters=[], limit=None): + ''' + Returns the specified properties of each request, + after filtering by queryFilters and applying the limit. + ''' + if not queryItems or not isinstance(queryItems, list): return {'Error': 'Missing query items'} - items = ', '.join(queryItems) - query = 'SELECT {} FROM {}'.format(items, self.table) - if queryfilters: - filters = ' AND '.join(queryfilters) - query += ' WHERE {}'.format(filters) - if limit: - query += ' LIMIT {}'.format(limit) + selectFields = [getattr(Request, item) for item in queryItems] + records = self.session \ + .query(*selectFields) \ + .filter(*queryFilters) \ + .limit(limit) \ + .all() + + return [rec._asdict() for rec in records] + + @includeMeta + def aggregateQuery(self, countFields=[], queryFilters=[]): + ''' + Returns the counts of distinct values in the specified fields, + after filtering by queryFilters. + ''' + + if not countFields or not isinstance(countFields, list): + return {'Error': 'Missing count fields'} + + filteredData = self.query(countFields, queryFilters) + df = pd.DataFrame(data=filteredData['data']) - df = pd.read_sql_query(query, con=self.engine) - return df.to_dict(orient='records') + return [{ + 'field': field, + 'counts': df.groupby(by=field).size().to_dict() + } for field in countFields] def storedProc(self): pass diff --git a/server/src/services/databaseOrm.py b/server/src/services/databaseOrm.py index 04a8e48b4..4b03220a7 100644 --- a/server/src/services/databaseOrm.py +++ b/server/src/services/databaseOrm.py @@ -5,7 +5,16 @@ Base = declarative_base() -class Ingest(Base): +class Mixin: + """ + Adds `_asdict()` to easily serialize objects to dictionaries. + """ + def _asdict(self): + cols = self.__table__.columns + return {col.name: getattr(self, col.name) for col in cols} + + +class Ingest(Base, Mixin): __tablename__ = 'ingest_staging_table' srnumber = Column(String(50), primary_key=True, unique=True) createddate = Column(DateTime) diff --git a/server/src/services/pinService.py b/server/src/services/pinService.py index 9754e50b5..ae92dda02 100644 --- a/server/src/services/pinService.py +++ b/server/src/services/pinService.py @@ -6,8 +6,8 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.dataAccess = DataService(config, tableName) async def get_base_pins(self, - startDate='', - endDate='', + startDate=None, + endDate=None, ncList=[], requestTypes=[]): """ @@ -28,13 +28,7 @@ async def get_base_pins(self, 'latitude', 'longitude'] - ncs = '\'' + '\', \''.join(ncList) + '\'' - requests = '\'' + '\', \''.join(requestTypes) + '\'' + filters = self.dataAccess.standardFilters( + startDate, endDate, ncList, requestTypes) - filters = ['createddate > \'{}\''.format(startDate), - 'createddate < \'{}\''.format(endDate), - 'ncname IN ({})'.format(ncs), - 'requesttype IN ({})'.format(requests)] - result = self.dataAccess.query(items, filters) - - return result + return self.dataAccess.query(items, filters) diff --git a/server/src/services/requestCountsService.py b/server/src/services/requestCountsService.py index 801da309c..0f4b5ee1e 100644 --- a/server/src/services/requestCountsService.py +++ b/server/src/services/requestCountsService.py @@ -1,6 +1,4 @@ -from sqlalchemy import func from .dataService import DataService -from .databaseOrm import Ingest as Request class RequestCountsService(object): @@ -40,31 +38,7 @@ async def get_req_counts(self, } """ - # filter by date, nc, and requestType (if provided) - filters = [ - Request.createddate > startDate if startDate else True, - Request.createddate < endDate if endDate else True, - Request.ncname.in_(ncList) if ncList else True, - Request.requesttype.in_(requestTypes) if requestTypes else True - ] + filters = self.dataAccess.standardFilters( + startDate, endDate, ncList, requestTypes) - data = [] - for field in countFields: - # make sure the field exists in the Request model - if not getattr(Request, field, None): - continue - - # run count/groupby query - results = self.dataAccess.session \ - .query(field, func.count()) \ - .filter(*filters) \ - .group_by(field) \ - .all() - - # add results to data set - data.append({ - 'field': field, - 'counts': dict(results) - }) - - return DataService.withMeta(data) + return self.dataAccess.aggregateQuery(countFields, filters) diff --git a/server/src/services/requestDetailService.py b/server/src/services/requestDetailService.py index a01a20d8b..f8eb0f971 100644 --- a/server/src/services/requestDetailService.py +++ b/server/src/services/requestDetailService.py @@ -25,8 +25,4 @@ async def get_request_detail(self, requestNumber=None): } """ - items = ['*'] - filters = ['srnumber = \'{}\''.format(requestNumber)] - result = self.dataAccess.query(items, filters) - - return result + return self.dataAccess.itemQuery(requestNumber)