Skip to content

Commit

Permalink
Merge pull request #601 from hackforla/550-BACK-PersistingDBConnections
Browse files Browse the repository at this point in the history
isolated db in its own module
  • Loading branch information
sellnat77 authored May 10, 2020
2 parents b415a97 + 6567fbf commit 65963ac
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 55 deletions.
2 changes: 2 additions & 0 deletions server/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from utils.sanic import add_performance_header
from utils.redis import cache
from utils.database import db

app = Sanic(__name__)
CORS(app)
Expand Down Expand Up @@ -48,6 +49,7 @@ def configure_app():
if app.config['Settings']['Server']['Debug']:
add_performance_header(app)
cache.config(app.config['Settings']['Redis'])
db.config(app.config['Settings']['Database'])


@app.route('/apistatus')
Expand Down
20 changes: 5 additions & 15 deletions server/src/services/dataService.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
import datetime
import pandas as pd
import sqlalchemy as db
from sqlalchemy.orm import sessionmaker
from .databaseOrm import Ingest as Request
from utils.database import db
import sqlalchemy as sql


class DataService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.config = config
self.dbString = None if not self.config \
else self.config['Database']['DB_CONNECTION_STRING']

self.table = tableName
self.data = None
self.engine = db.create_engine(self.dbString)
self.Session = sessionmaker(bind=self.engine)

async def lastPulled(self):
# Will represent last time the ingest pipeline ran
return datetime.datetime.utcnow()
Expand Down Expand Up @@ -48,7 +38,7 @@ def comparisonFilters(self,
Request.createddate > startDate if startDate else False,
Request.createddate < endDate if endDate else False,
Request.requesttype.in_(requestTypes),
db.or_(Request.nc.in_(ncList), Request.cd.in_(cdList))
sql.or_(Request.nc.in_(ncList), Request.cd.in_(cdList))
]

def itemQuery(self, requestNumber):
Expand All @@ -63,7 +53,7 @@ def itemQuery(self, requestNumber):
if 'id' in fields:
fields.remove('id')

session = self.Session()
session = db.Session()
record = session \
.query(*fields) \
.filter(Request.srnumber == requestNumber) \
Expand All @@ -86,7 +76,7 @@ def query(self, queryItems=[], queryFilters=[], limit=None):

selectFields = [getattr(Request, item) for item in queryItems]

session = self.Session()
session = db.Session()
records = session \
.query(*selectFields) \
.filter(*queryFilters) \
Expand Down
4 changes: 2 additions & 2 deletions server/src/services/frequencyService.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@


class FrequencyService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)
def __init__(self, config=None):
self.dataAccess = DataService()

def get_bins(self, startDate, endDate):
"""
Expand Down
2 changes: 1 addition & 1 deletion server/src/services/heatmapService.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def get_heatmap(self, filters):

fields = ['latitude', 'longitude']
if pins is None:
dataAccess = DataService(self.config)
dataAccess = DataService()

filters = dataAccess.standardFilters(
filters['startDate'],
Expand Down
2 changes: 1 addition & 1 deletion server/src/services/pinClusterService.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_pins(self, filters):
pins = cache.get(key)

if pins is None:
dataAccess = DataService(self.config)
dataAccess = DataService()

fields = [
'srnumber',
Expand Down
4 changes: 2 additions & 2 deletions server/src/services/pinService.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@


class PinService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)
def __init__(self, config=None):
self.dataAccess = DataService()

async def get_base_pins(self,
startDate=None,
Expand Down
4 changes: 2 additions & 2 deletions server/src/services/requestCountsService.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@


class RequestCountsService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)
def __init__(self, config=None):
self.dataAccess = DataService()

async def get_req_counts(self,
startDate=None,
Expand Down
4 changes: 2 additions & 2 deletions server/src/services/requestDetailService.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@


class RequestDetailService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)
def __init__(self, config=None):
self.dataAccess = DataService()

async def get_request_detail(self, requestNumber=None):
"""
Expand Down
29 changes: 10 additions & 19 deletions server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from utils.database import db
import time
import json
from .databaseOrm import Ingest, Base
Expand All @@ -21,19 +19,16 @@ def end(self):

class DataHandler:
def __init__(self, config=None):
dbString = config['Database']['DB_CONNECTION_STRING']

self.engine = create_engine(dbString)
self.session = sessionmaker(bind=self.engine)()
self.session = db.Session()
self.socrata = SocrataClient(config)

def __del__(self):
self.session.close()

def resetDatabase(self):
log('\nResetting database.')
Base.metadata.drop_all(self.engine)
Base.metadata.create_all(self.engine)
Base.metadata.drop_all(db.engine)
Base.metadata.create_all(db.engine)

def fetchData(self, year, offset, limit):
log('\tFetching {} rows, offset {}'.format(limit, offset))
Expand Down Expand Up @@ -74,12 +69,8 @@ def ingestYear(self, year, limit, querySize):
}

def cleanTable(self):
def exec_sql(sql):
with self.engine.connect() as conn:
return conn.execute(text(sql))

def dropDuplicates(table, report):
rows = exec_sql(f"""
rows = db.exec_sql(f"""
DELETE FROM {table} a USING {table} b
WHERE a.id < b.id AND a.srnumber = b.srnumber;
""")
Expand All @@ -90,7 +81,7 @@ def dropDuplicates(table, report):
})

def switchPrimaryKey(table, report):
exec_sql(f"""
db.exec_sql(f"""
ALTER TABLE {table} DROP COLUMN id;
ALTER TABLE {table} ADD PRIMARY KEY (srnumber);
""")
Expand All @@ -101,7 +92,7 @@ def switchPrimaryKey(table, report):
})

def removeInvalidClosedDates(table, report):
result = exec_sql(f"""
result = db.exec_sql(f"""
UPDATE {table}
SET closeddate = NULL
WHERE closeddate::timestamp < createddate::timestamp;
Expand All @@ -113,7 +104,7 @@ def removeInvalidClosedDates(table, report):
})

def setDaysToClose(table, report):
result = exec_sql(f"""
result = db.exec_sql(f"""
UPDATE {table}
SET _daystoclose = EXTRACT (
EPOCH FROM
Expand All @@ -128,7 +119,7 @@ def setDaysToClose(table, report):
})

def fixNorthWestwood(table, report):
result = exec_sql(f"""
result = db.exec_sql(f"""
UPDATE {table}
SET nc = 127
WHERE nc = 0 AND ncname = 'NORTH WESTWOOD NC'
Expand All @@ -140,7 +131,7 @@ def fixNorthWestwood(table, report):
})

def fixHistoricCulturalNorth(table, report):
result = exec_sql(f"""
result = db.exec_sql(f"""
UPDATE {table}
SET nc = 128
WHERE nc = 0 AND ncname = 'HISTORIC CULTURAL NORTH NC'
Expand Down
4 changes: 2 additions & 2 deletions server/src/services/timeToCloseService.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


class TimeToCloseService(object):
def __init__(self, config=None, tableName="ingest_staging_table"):
self.dataAccess = DataService(config, tableName)
def __init__(self, config=None):
self.dataAccess = DataService()

def ttc(self, groupField, groupFieldItems, filters):

Expand Down
34 changes: 34 additions & 0 deletions server/src/utils/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text


class Database(object):
def __init__(self, verbose=False):
self.verbose = verbose

def config(self, config):
self.engine = create_engine(config['DB_CONNECTION_STRING'])
self.Session = sessionmaker(bind=self.engine)

if self.verbose:
self.log_connection_events()

def exec_sql(self, sql):
with self.engine.connect() as conn:
return conn.execute(text(sql))

def log_connection_events(self):
def on_checkout(*args, **kwargs):
print('process id {} checkout'.format(os.getpid()), flush=True)

def on_checkin(*args, **kwargs):
print('process id {} checkin'.format(os.getpid()), flush=True)

from sqlalchemy import event
import os
event.listen(self.engine, 'checkout', on_checkout)
event.listen(self.engine, 'checkin', on_checkin)


db = Database()
7 changes: 7 additions & 0 deletions server/test/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
from os.path import dirname, abspath, join


# add src directory to path so pytest can find modules
src_dir = join(dirname(abspath(__file__)), '..', 'src')
sys.path.append(src_dir)
12 changes: 3 additions & 9 deletions server/test/test_db_service.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
from src.services.dataService import DataService

TESTCONFIG = {
"Database": {
"DB_CONNECTION_STRING": "postgresql://testingString/postgresql"
}
}


def test_serviceExists():
# Arrange
# Act
data_worker = DataService(TESTCONFIG)
data_worker = DataService()
# Assert
assert isinstance(data_worker, DataService)


def test_emptyQuery():
# Arrange
queryItems = []
data_worker = DataService(TESTCONFIG)
data_worker = DataService()
# Act
result = data_worker.query(queryItems)
# Assert
Expand All @@ -28,7 +22,7 @@ def test_emptyQuery():
def test_nullQuery():
# Arrange
queryItems = None
data_worker = DataService(TESTCONFIG)
data_worker = DataService()
# Act
result = data_worker.query(queryItems)
# Assert
Expand Down

0 comments on commit 65963ac

Please sign in to comment.