diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 000000000..22b934d44 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +*.csv filter=lfs diff=lfs merge=lfs -text +*.tsv filter=lfs diff=lfs merge=lfs -text diff --git a/server/requirements.txt b/server/requirements.txt index 5067ec730..83d114430 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -13,10 +13,17 @@ itsdangerous==1.1.0 Jinja2==2.10.3 MarkupSafe==1.1.1 multidict==4.5.2 +numpy==1.17.4 +pandas==0.25.3 +psycopg2==2.8.4 +python-dateutil==2.8.1 +pytz==2019.3 requests==2.22.0 requests-async==0.5.0 rfc3986==1.3.2 sanic==19.9.0 +six==1.13.0 +SQLAlchemy==1.3.11 ujson==1.35 urllib3==1.25.7 uvloop==0.14.0 diff --git a/server/src/app.py b/server/src/app.py index 0167e55f2..86e9f6f92 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -4,14 +4,24 @@ from services.time_to_close import time_to_close from services.frequency import frequency from services.ingress_service import ingress_service +from configparser import ConfigParser + app = Sanic(__name__) -app.config.from_pyfile(os.path.join(os.getcwd(),'settings.cfg')) + +def configure_app(): + # Settings initialization + config = ConfigParser() + settings_file = os.path.join(os.getcwd(),'settings.cfg') + config.read(settings_file) + app.config['Settings'] = config + @app.route('/') async def index(request): return json('You hit the index') + @app.route('/timetoclose') async def timetoclose(request): ttc_worker = time_to_close() @@ -20,6 +30,7 @@ async def timetoclose(request): return json(return_data) + @app.route('/requestfrequency') async def requestfrequency(request): freq_worker = frequency() @@ -28,28 +39,44 @@ async def requestfrequency(request): return json(return_data) + @app.route('/sample-data') async def sample_route(request): sample_dataset = {'cool_key':['value1', 'value2'], app.config['REDACTED']:app.config['REDACTED']} return json(sample_dataset) -@app.route('/injest') -async def injest(request): - ingress_worker = ingress_service() - return_data = ingress_worker.injest() + +@app.route('/ingest', methods=["POST"]) +async def ingest(request): + '''Accept POST requests with a list of datasets to import\ + based on the YearMapping. Body parameter format is \ + {"sets": ["YearMappingKey","YearMappingKey","YearMappingKey"]}''' + + ingress_worker = ingress_service(config=app.config['Settings']) + return_data = {'response':'ingest ok'} + + for dataSet in request.json.get("sets", None): + target_data = app.config["Settings"]["YearMapping"][dataSet] + return_data = await ingress_worker.ingest(from_dataset=target_data) + return json(return_data) + @app.route('/update') async def update(request): ingress_worker = ingress_service() return_data = ingress_worker.update() return json(return_data) + @app.route('/delete') async def delete(request): ingress_worker = ingress_service() return_data = ingress_worker.delete() return json(return_data) + + if __name__ == '__main__': - app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG']) + configure_app() + app.run(host=app.config['Settings']['Server']['HOST'], port=app.config['Settings']['Server']['PORT'], debug=app.config['Settings']['Server']['DEBUG']) diff --git a/server/src/services/ingress_service.py b/server/src/services/ingress_service.py index b7c9970d8..462e8d8f2 100644 --- a/server/src/services/ingress_service.py +++ b/server/src/services/ingress_service.py @@ -1,9 +1,16 @@ +from .sqlIngest import DataHandler + class ingress_service(object): - def __init__(self): - pass + def __init__(self, config=None): + self.config = config + - def injest(self): - return {'response':'injest ok'} + async def ingest(self, from_dataset=None): + loader = DataHandler(config=self.config) + loader.loadData(fileName=from_dataset) + loader.cleanData() + loader.ingestData() + return {'response':'ingest ok'} def update(self): return {'response':'update ok'} @@ -13,3 +20,10 @@ def delete(self): def hello_world(self): return {'response':'hello from frequency service'} + +if __name__ == "__main__": + from configparser import ConfigParser + config = ConfigParser() + config.read('../settings.cfg') + worker = ingress_service(config = config) + worker.ingest() diff --git a/server/src/sqlIngest.py b/server/src/services/sqlIngest.py similarity index 85% rename from server/src/sqlIngest.py rename to server/src/services/sqlIngest.py index 73498b2b5..98aa6abcc 100644 --- a/server/src/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -1,3 +1,4 @@ +import os from sqlalchemy.types import Integer, Text, String, DateTime, Float from sqlalchemy import create_engine import pandas as pd @@ -6,26 +7,40 @@ import logging class DataHandler: - def __init__(self): + def __init__(self, config=None, configFilePath=None, separator=','): self.data = None - self.config = None - self.dbString = None - self.csvPath = None - self.configFilePath = None + self.config = config + self.dbString = None if not self.config else self.config['Database']['DB_CONNECTION_STRING'] + self.filePath = None + self.configFilePath = configFilePath + self.separator = separator + + def loadConfig(self, configFilePath): '''Load and parse config data''' + if self.config: + print('Config already exists at %s. Nothing to load.' % self.configFilePath) + return + print('Loading config file %s' % self.configFilePath) self.configFilePath = configFilePath config = ConfigParser() config.read(configFilePath) self.config = config - self.dbString = config['Main']['DB_CONNECTION_STRING'] - self.csvPath = "%s311data.tsv" % (config['Main']['CSV_DIRECTORY']) - def loadData(self): + self.dbString = config['Database']['DB_CONNECTION_STRING'] + + + def loadData(self, fileName="311data"): '''Load dataset into pandas object''' - print('Loading dataset %s' % self.csvPath) - self.data = pd.read_table(self.csvPath, - sep='\t', + if self.separator == ',': + dataFile = fileName + ".csv" + else: + dataFile = fileName + ".tsv" + + self.filePath = os.path.join(self.config['Database']['DATA_DIRECTORY'], dataFile ) + print('Loading dataset %s' % self.filePath) + self.data = pd.read_table(self.filePath, + sep=self.separator, na_values=['nan'], dtype={ 'SRNumber':str, @@ -62,6 +77,7 @@ def loadData(self): 'NCName':str, 'PolicePrecinct':str }) + def cleanData(self): '''Perform general data filtering''' print('Cleaning 311 dataset...') @@ -80,12 +96,13 @@ def cleanData(self): data['service_created'] = data.ServiceDate-data.CreatedDate # drop NA values and reformat closed_created in units of hours data = data[~data.closed_created.isna()] - # New column: closed_created in units of days + # New column: closed_created in units of days data['closed_createdD'] = data.closed_created / pd.Timedelta(days=1) # xFUTURE: Geolocation/time clustering to weed out repeat requests # xFUTURE: Decide whether ServiceDate or ClosedDate are primary metric # xFUTURE: Removal of feedback and other categories self.data = data + def ingestData(self): '''Set up connection to database''' print('Inserting data into Postgres instance...') @@ -140,10 +157,7 @@ def ingestData(self): if __name__ == "__main__": loader = DataHandler() - loader.loadConfig('settings.cfg') + loader.loadConfig('../settings.cfg') loader.loadData() loader.cleanData() loader.ingestData() - - - diff --git a/server/src/settings.example.cfg b/server/src/settings.example.cfg index 992d8d3ff..b66018e5e 100644 --- a/server/src/settings.example.cfg +++ b/server/src/settings.example.cfg @@ -1,6 +1,15 @@ -[Main] +[Server] DEBUG = True HOST = 0.0.0.0 PORT = 5000 -DB_CONNECTION_STRING = postgres://REDACTED:REDACTED@somehost/postgres + +[Database] +DB_CONNECTION_STRING = postgres://REDACTED:REDACTED@localhost:5432/postgres +DATA_DIRECTORY = . + +[Api] REDACTED = REDACTED + +[YearMapping] +2018_MINI = 2018_mini +2018_FULL = 311data diff --git a/server/src/static/2018_full.csv b/server/src/static/2018_full.csv new file mode 100644 index 000000000..1c29cdbcf --- /dev/null +++ b/server/src/static/2018_full.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:99801a8dcab82bdefacc4d523db28688b202490a3169add98bd3cdd8a841e857 +size 418716116 diff --git a/server/src/static/2018_mini.csv b/server/src/static/2018_mini.csv new file mode 100644 index 000000000..7a5ee8306 --- /dev/null +++ b/server/src/static/2018_mini.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3575c4ee7aff80f9c11c50c4e3063141b57ed511240751d66ce37dd00242c98e +size 703644