Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/pin-clusters with redis (plus /heatmap) #574

Merged
merged 4 commits into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Orchestration/docker-compose-example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,21 @@ services:
container_name: "postgres-dashboard"
ports:
- 8080:8080

redis:
build:
context: ../redis
container_name: "311-redis"
expose:
- 6379

rebrow:
image: marian/rebrow
links:
- redis:redis
container_name: "redis-dashboard"
ports:
- 5001:5001

volumes:
backend_data:
3 changes: 3 additions & 0 deletions redis/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM redis
COPY redis.conf /usr/local/etc/redis/redis.conf
CMD [ "redis-server", "/usr/local/etc/redis/redis.conf" ]
43 changes: 43 additions & 0 deletions redis/redis.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# full example config here:
# http://download.redis.io/redis-stable/redis.conf

# discussion of memory policies here:
# https://redis.io/topics/lru-cache

# how to check that this config is working inside docker:
# 1. login to container: `docker exec -it 311-redis /bin/bash`
# 2. start the redis cli: `redis-cli`
# 3. check the config: `config get maxmemory-policy`

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
#
# LRU means Least Recently Used
# LFU means Least Frequently Used
#
# Both LRU, LFU and volatile-ttl are implemented using approximated
# randomized algorithms.
#
# Note: with any of the above policies, Redis will return an error on write
# operations, when there are no suitable keys for eviction.
#
# At the date of writing these commands are: set setnx setex append
# incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
# sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
# zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
# getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction

maxmemory-policy volatile-lfu
2 changes: 1 addition & 1 deletion server/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM python:3.7-slim

RUN apt-get update && apt-get install -yq \
python3 python3-dev gcc \
python3 python3-dev gcc g++ \
gfortran musl-dev

ENV DB_CONNECTION_STRING=REDACTED
Expand Down
270 changes: 270 additions & 0 deletions server/performanceStatistics/PinsComparisonTest.jmx

Large diffs are not rendered by default.

181 changes: 181 additions & 0 deletions server/performanceStatistics/pins-comparison/pins-comparison.jtl

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ py==1.8.1
pycodestyle==2.5.0
pyflakes==2.1.1
pyparsing==2.4.6
pysupercluster==0.7.6
pytest==5.3.3
python-dateutil==2.8.1
pytz==2019.3
redis==3.5.0
requests==2.23.0
requests-async==0.5.0
rfc3986==1.3.2
Expand Down
41 changes: 41 additions & 0 deletions server/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
from services.timeToCloseService import TimeToCloseService
from services.frequencyService import FrequencyService
from services.pinService import PinService
from services.pinClusterService import PinClusterService
from services.heatmapService import HeatmapService
from services.requestCountsService import RequestCountsService
from services.requestDetailService import RequestDetailService
from services.sqlIngest import DataHandler
from services.feedbackService import FeedbackService
from services.dataService import DataService

from utils.sanic import add_performance_header
from utils.redis import cache

app = Sanic(__name__)
CORS(app)
Expand All @@ -44,6 +47,7 @@ def configure_app():
os.makedirs(os.path.join(app.config["STATIC_DIR"], "temp"), exist_ok=True)
if app.config['Settings']['Server']['Debug']:
add_performance_header(app)
cache.config(app.config['Settings']['Redis'])


@app.route('/apistatus')
Expand Down Expand Up @@ -222,6 +226,43 @@ async def pinMap(request):
return json(return_data)


@app.route('/pin-clusters', methods=["POST"])
@compress.compress()
async def pinClusters(request):
worker = PinClusterService(app.config['Settings'])

postArgs = request.json
filters = {
'startDate': postArgs.get('startDate', None),
'endDate': postArgs.get('endDate', None),
'requestTypes': postArgs.get('requestTypes', []),
'ncList': postArgs.get('ncList', [])
}
zoom = int(postArgs.get('zoom', 0))
bounds = postArgs.get('bounds', {})
options = postArgs.get('options', {})

clusters = await worker.get_pin_clusters(filters, zoom, bounds, options)
return json(clusters)


@app.route('/heatmap', methods=["POST"])
@compress.compress()
async def heatmap(request):
worker = HeatmapService(app.config['Settings'])

postArgs = request.json
filters = {
'startDate': postArgs.get('startDate', None),
'endDate': postArgs.get('endDate', None),
'requestTypes': postArgs.get('requestTypes', []),
'ncList': postArgs.get('ncList', [])
}

heatmap = await worker.get_heatmap(filters)
return json(heatmap)


@app.route('/requestcounts', methods=["POST"])
@compress.compress()
async def requestCounts(request):
Expand Down
36 changes: 36 additions & 0 deletions server/src/services/heatmapService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pandas as pd
import hashlib
import json
from utils.redis import cache
from .dataService import DataService


class HeatmapService(object):
def __init__(self, config=None):
self.config = config

def pins_key(self, filters):
filters_json = json.dumps(filters, sort_keys=True).encode('utf-8')
hashed_json = hashlib.md5(filters_json).hexdigest()
return 'filters:{}:pins'.format(hashed_json)

async def get_heatmap(self, filters):
key = self.pins_key(filters)
pins = cache.get(key)

fields = ['latitude', 'longitude']
if pins is None:
dataAccess = DataService(self.config)

filters = dataAccess.standardFilters(
filters['startDate'],
filters['endDate'],
filters['requestTypes'],
filters['ncList'])

pins = dataAccess.query(fields, filters)
pins = pd.DataFrame(pins, columns=fields)
else:
pins = pins[fields]

return pins.to_numpy()
81 changes: 81 additions & 0 deletions server/src/services/pinClusterService.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pysupercluster
import pandas as pd
import hashlib
import json
from utils.redis import cache
from .dataService import DataService


class PinClusterService(object):
def __init__(self, config=None):
self.config = config

def pins_key(self, filters):
filters_json = json.dumps(filters, sort_keys=True).encode('utf-8')
hashed_json = hashlib.md5(filters_json).hexdigest()
return 'filters:{}:pins'.format(hashed_json)

def get_pins(self, filters):
key = self.pins_key(filters)
pins = cache.get(key)

if pins is None:
dataAccess = DataService(self.config)

fields = [
'srnumber',
'requesttype',
'latitude',
'longitude']

filters = dataAccess.standardFilters(
filters['startDate'],
filters['endDate'],
filters['requestTypes'],
filters['ncList'])

pins = dataAccess.query(fields, filters)
pins = pd.DataFrame(pins, columns=fields)

cache.set(key, pins)

return pins

def pin_clusters(self, pins, zoom, bounds, options={}):
if len(pins) == 0:
return []

min_zoom = options.get('min_zoom', 0)
max_zoom = options.get('max_zoom', 17)
radius = options.get('radius', 200)
extent = options.get('extent', 512)

index = pysupercluster.SuperCluster(
pins[['longitude', 'latitude']].to_numpy(),
min_zoom=min_zoom,
max_zoom=max_zoom,
radius=radius,
extent=extent)

north = bounds.get('north', 90)
south = bounds.get('south', -90)
west = bounds.get('west', -180)
east = bounds.get('east', 180)

clusters = index.getClusters(
top_left=(west, north),
bottom_right=(east, south),
zoom=zoom)

for cluster in clusters:
if cluster['count'] == 1:
pin = pins.iloc[cluster['id']]
cluster['srnumber'] = pin['srnumber']
cluster['requesttype'] = pin['requesttype']
del cluster['expansion_zoom']

return clusters

async def get_pin_clusters(self, filters, zoom, bounds, options):
pins = self.get_pins(filters)
return self.pin_clusters(pins, zoom, bounds, options)
4 changes: 4 additions & 0 deletions server/src/settings.example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ GITHUB_TOKEN = REDACTED
ISSUES_URL = https://api.github.com/repos/hackforla/311-data/issues
PROJECT_URL = REDACTED
GITHUB_SHA = DEVELOPMENT

[Redis]
ENABLED = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default this to false?
Also is there a url for redis? Im guessing its defaulting to localhost:6379
In prod we will have a environment variable override for this

OOORRRRR we can base ENABLED off of 'was there a redis url provided or not' that way we kill two birds with 1 stone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I can change that.

for the url, right now it's using 'redis' as the host, which I guess is the hostname of the redis service within the docker compose network...if I understand docker correctly. And the port is 6379. I'll figure out how to set this up as an env variable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh okay yeah...makes sense
Within the docker env itll be exposed as both redis:6379 and localhost:6379 so either way we want to be explicit with how we address redis so we can override it with heroku's fn8934fh3o8g3o3qhg893gg.heroku.com:6379

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok cool this is done

TTL_SECONDS = 3600
34 changes: 34 additions & 0 deletions server/src/utils/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import redis
import pickle
from datetime import timedelta


class RedisCache(object):
def config(self, config):
self.enabled = config['ENABLED'] == 'True'
if self.enabled:
self.ttl = int(config['TTL_SECONDS'])
self.r = redis.Redis(host='redis')

def get(self, key):
if not self.enabled:
return None

value = self.r.get(key)
if value is None:
return None
else:
return pickle.loads(value)

def set(self, key, value):
if not self.enabled:
return None

value = pickle.dumps(value)
try:
self.r.setex(key, timedelta(seconds=self.ttl), value)
except Exception as e:
print(e)


cache = RedisCache()
11 changes: 7 additions & 4 deletions server/src/utils/sanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ async def request(req):
req.ctx.start = time.perf_counter()

async def response(req, res):
duration = time.perf_counter() - req.ctx.start
res.headers['x-performance'] = json.dumps({
'executionTime': round(duration, 4)
})
try:
duration = time.perf_counter() - req.ctx.start
res.headers['x-performance'] = json.dumps({
'executionTime': round(duration, 4)
})
except Exception:
pass

sanic_app.register_middleware(request, attach_to='request')
sanic_app.register_middleware(response, attach_to='response')