diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 000000000..739b02194 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,8 @@ +Fixes #{issue number here} + + - [ ] Up to date with `dev` branch + - [ ] Branch name follows [guidelines](https://github.com/hackforla/311-data/blob/master/GETTING_STARTED.md#feature-branching) + - [ ] All PR Status checks are successful + - [ ] Peer reviewed and approved + +Any questions? See the [getting started guide](https://github.com/hackforla/311-data/blob/master/GETTING_STARTED.md) diff --git a/server/Dockerfile b/server/Dockerfile index e97570ca4..4c322d588 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -6,6 +6,7 @@ RUN apt-get update && apt-get install -yq \ ENV DB_CONNECTION_STRING=REDACTED ENV PORT=5000 +ENV TOKEN=None COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt diff --git a/server/src/app.py b/server/src/app.py index 3bd8111bd..43610055e 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -11,6 +11,7 @@ from services.time_to_close import time_to_close from services.frequency import frequency from services.pinService import PinService +from services.requestCountsService import RequestCountsService from services.requestDetailService import RequestDetailService from services.ingress_service import ingress_service from services.sqlIngest import DataHandler @@ -27,6 +28,9 @@ def environment_overrides(): if os.environ.get('PORT', None): app.config['Settings']['Server']['PORT'] =\ os.environ.get('PORT') + if os.environ.get('TOKEN', None): + app.config['Settings']['Socrata']['TOKEN'] =\ + os.environ.get('TOKEN') def configure_app(): @@ -140,6 +144,25 @@ async def pinMap(request): return json(return_data) +@app.route('/requestcounts', methods=["POST"]) +@compress.compress() +async def requestCounts(request): + counts_worker = RequestCountsService(app.config['Settings']) + postArgs = request.json + start = postArgs.get('startDate', None) + end = postArgs.get('endDate', None) + ncs = postArgs.get('ncList', []) + requests = postArgs.get('requestTypes', []) + countFields = postArgs.get('countFields', []) + + return_data = await counts_worker.get_req_counts(startDate=start, + endDate=end, + ncList=ncs, + requestTypes=requests, + countFields=countFields) + return json(return_data) + + @app.route('/servicerequest/', methods=["GET"]) async def requestDetails(request, srnumber): detail_worker = RequestDetailService(app.config['Settings']) diff --git a/server/src/services/dataService.py b/server/src/services/dataService.py index ddf364e4b..8749fa8fe 100644 --- a/server/src/services/dataService.py +++ b/server/src/services/dataService.py @@ -1,6 +1,8 @@ import datetime import pandas as pd import sqlalchemy as db +from sqlalchemy.orm import sessionmaker +from .databaseOrm import Ingest as Request class DataService(object): @@ -9,7 +11,6 @@ def innerFunc(*args, **kwargs): dataResponse = func(*args, **kwargs) if 'Error' in dataResponse: return dataResponse - # Will represent last time the ingest pipeline ran lastPulledTimestamp = datetime.datetime.utcnow() withMeta = {'lastPulled': lastPulledTimestamp, @@ -26,22 +27,74 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.table = tableName self.data = None self.engine = db.create_engine(self.dbString) + self.session = sessionmaker(bind=self.engine)() + + def standardFilters(self, + startDate=None, + endDate=None, + ncList=[], + requestTypes=[]): + ''' + Generates filters for dates, ncs, and request types. + ''' + + return [ + Request.createddate > startDate if startDate else True, + Request.createddate < endDate if endDate else True, + Request.ncname.in_(ncList) if ncList else True, + Request.requesttype.in_(requestTypes) if requestTypes else True + ] + + @includeMeta + def itemQuery(self, requestNumber): + ''' + Returns a single request by its requestNumber. + ''' + + if not requestNumber or not isinstance(requestNumber, str): + return {'Error': 'Missing request number'} + + return self.session \ + .query(Request) \ + .get(requestNumber) \ + ._asdict() @includeMeta - def query(self, queryItems=None, queryfilters=[], limit=None): + def query(self, queryItems=[], queryFilters=[], limit=None): + ''' + Returns the specified properties of each request, + after filtering by queryFilters and applying the limit. + ''' + if not queryItems or not isinstance(queryItems, list): return {'Error': 'Missing query items'} - items = ', '.join(queryItems) - query = 'SELECT {} FROM {}'.format(items, self.table) - if queryfilters: - filters = ' AND '.join(queryfilters) - query += ' WHERE {}'.format(filters) - if limit: - query += ' LIMIT {}'.format(limit) + selectFields = [getattr(Request, item) for item in queryItems] + records = self.session \ + .query(*selectFields) \ + .filter(*queryFilters) \ + .limit(limit) \ + .all() + + return [rec._asdict() for rec in records] + + @includeMeta + def aggregateQuery(self, countFields=[], queryFilters=[]): + ''' + Returns the counts of distinct values in the specified fields, + after filtering by queryFilters. + ''' + + if not countFields or not isinstance(countFields, list): + return {'Error': 'Missing count fields'} + + filteredData = self.query(countFields, queryFilters) + df = pd.DataFrame(data=filteredData['data']) - df = pd.read_sql_query(query, con=self.engine) - return df.to_dict(orient='records') + return [{ + 'field': field, + 'counts': df.groupby(by=field).size().to_dict() + } for field in countFields] def storedProc(self): pass diff --git a/server/src/services/databaseOrm.py b/server/src/services/databaseOrm.py index 04a8e48b4..4b03220a7 100644 --- a/server/src/services/databaseOrm.py +++ b/server/src/services/databaseOrm.py @@ -5,7 +5,16 @@ Base = declarative_base() -class Ingest(Base): +class Mixin: + """ + Adds `_asdict()` to easily serialize objects to dictionaries. + """ + def _asdict(self): + cols = self.__table__.columns + return {col.name: getattr(self, col.name) for col in cols} + + +class Ingest(Base, Mixin): __tablename__ = 'ingest_staging_table' srnumber = Column(String(50), primary_key=True, unique=True) createddate = Column(DateTime) diff --git a/server/src/services/pinService.py b/server/src/services/pinService.py index 9754e50b5..ae92dda02 100644 --- a/server/src/services/pinService.py +++ b/server/src/services/pinService.py @@ -6,8 +6,8 @@ def __init__(self, config=None, tableName="ingest_staging_table"): self.dataAccess = DataService(config, tableName) async def get_base_pins(self, - startDate='', - endDate='', + startDate=None, + endDate=None, ncList=[], requestTypes=[]): """ @@ -28,13 +28,7 @@ async def get_base_pins(self, 'latitude', 'longitude'] - ncs = '\'' + '\', \''.join(ncList) + '\'' - requests = '\'' + '\', \''.join(requestTypes) + '\'' + filters = self.dataAccess.standardFilters( + startDate, endDate, ncList, requestTypes) - filters = ['createddate > \'{}\''.format(startDate), - 'createddate < \'{}\''.format(endDate), - 'ncname IN ({})'.format(ncs), - 'requesttype IN ({})'.format(requests)] - result = self.dataAccess.query(items, filters) - - return result + return self.dataAccess.query(items, filters) diff --git a/server/src/services/requestCountsService.py b/server/src/services/requestCountsService.py new file mode 100644 index 000000000..0f4b5ee1e --- /dev/null +++ b/server/src/services/requestCountsService.py @@ -0,0 +1,44 @@ +from .dataService import DataService + + +class RequestCountsService(object): + def __init__(self, config=None, tableName="ingest_staging_table"): + self.dataAccess = DataService(config, tableName) + + async def get_req_counts(self, + startDate=None, + endDate=None, + ncList=[], + requestTypes=[], + countFields=[]): + """ + For each countField, returns the counts of each distinct value + in that field, given times, ncs, and request filters. + E.g. if countsFields is ['requesttype', 'requestsource'], returns: + { + 'lastPulled': 'Timestamp', + 'data': [ + { + 'field': 'requesttype', + 'counts': { + 'Graffiti Removal': 'Int', + 'Bulky Items': 'Int', + ... + } + }, + { + 'field': 'requestsource', + 'counts': { + 'Mobile App': 'Int', + 'Driver Self Report': 'Int', + ... + } + } + ] + } + """ + + filters = self.dataAccess.standardFilters( + startDate, endDate, ncList, requestTypes) + + return self.dataAccess.aggregateQuery(countFields, filters) diff --git a/server/src/services/requestDetailService.py b/server/src/services/requestDetailService.py index a01a20d8b..f8eb0f971 100644 --- a/server/src/services/requestDetailService.py +++ b/server/src/services/requestDetailService.py @@ -25,8 +25,4 @@ async def get_request_detail(self, requestNumber=None): } """ - items = ['*'] - filters = ['srnumber = \'{}\''.format(requestNumber)] - result = self.dataAccess.query(items, filters) - - return result + return self.dataAccess.itemQuery(requestNumber) diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 8553ddf26..2cd31610a 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -139,15 +139,17 @@ def fetchSocrata(self, # Fetch data # Loop for querying dataset tableInit = False - for i in range(0, totalRequestRecords, querySize): + query = int(querySize) + maxRecords = int(totalRequestRecords) + for i in range(0, maxRecords, query): fetchTimer = time.time() print('Fetching %d records with offset %d up to a max of %d' - % (querySize, i, totalRequestRecords)) + % (query, i, maxRecords)) results = client.get(socrata_dataset_identifier, offset=i, select="*", order="updateddate DESC", - limit=querySize) + limit=query) if not results: break tempDf = pd.DataFrame.from_dict(results)