Skip to content
This repository has been archived by the owner on Jan 12, 2019. It is now read-only.

Commit

Permalink
Merge pull request #6 from mozilla-services/feat/refactor-lambda-class
Browse files Browse the repository at this point in the history
feat: refactor handler into singleton class
  • Loading branch information
jrconlin committed Mar 22, 2016
2 parents b57ac42 + ba2666c commit b4fcf4e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 90 deletions.
132 changes: 64 additions & 68 deletions push_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,67 @@
from push_processor.processor.pubkey import PubKeyProcessor


here_dir = os.path.abspath(os.path.dirname(__file__))


redis_server = None
settings = None


def aws_lambda(event, context):
if "Bucket" in event:
# S3 test event, return
return "Test event"

# Check that we have a record
if "Records" not in event:
return "No record found in event"

# Setup local objects if needed
if not redis_server:
setup_objects()

pub_keys = get_all_keys(settings["db_tablename"])
processor = PubKeyProcessor(pub_keys)
use_gzip = settings.get("use_gzip", False)

for record in event["Records"]:
s3_obj = record["s3"]
bucket, key = s3_obj["bucket"]["name"], s3_obj["object"]["key"]
f = s3_open(bucket, key, use_gzip=use_gzip)
if settings["file_type"] == "heka":
process_heka_stream(redis_server, processor, f)
else:
process_json_stream(redis_server, processor, f)


def setup_objects():
"""Sets up module globals for handler run based on S3 config
Reads a local file to the package, which is hard-coded on bundling
for S3 to reference runtime options
"""
global redis_server, settings
with open(os.path.join(here_dir, "settings.js")) as f:
settings = json.load(f)

# Load S3 config settings if supplied
if "s3_bucket" in settings:
f = s3_open(settings["s3_bucket"], settings["s3_key"])
s3_config = json.loads(f.read())
settings.update(s3_config)

redis_server = redis.StrictRedis(
host=settings["redis_host"],
port=settings["redis_port"]
)


def process_heka_stream(redis_server, processor, stream):
reader = read_heka_file_stream(stream)
for msg in reader:
processor.process_message(msg)
dump_latest_messages_to_redis(redis_server, processor.latest_messages)


def process_json_stream(redis_server, processor, stream):
for msg in eventsFromJSONLogFile(stream):
processor.process_message(Message(json=msg))
dump_latest_messages_to_redis(redis_server, processor.latest_messages)
class Lambda(object):
__instance = None

def __new__(cls):
"""Singleton instance to avoid repeat setup"""
if Lambda.__instance is None:
Lambda.__instance = object.__new__(cls)
return Lambda.__instance

def __init__(self):
here_dir = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here_dir, "settings.js")) as f:
settings = json.load(f)

# Load S3 config settings if supplied
if "s3_bucket" in settings:
f = s3_open(settings["s3_bucket"], settings["s3_key"])
s3_config = json.loads(f.read())
settings.update(s3_config)

self.redis_server = redis.StrictRedis(
host=settings["redis_host"],
port=settings["redis_port"]
)
self.settings = settings

@classmethod
def handler(cls, event, context):
return cls().handle_event(event, context)

def handle_event(self, event, context):
if "Bucket" in event:
# S3 test event, return
return "Test event"

# Check that we have a record
if "Records" not in event:
return "No record found in event"

pub_keys = get_all_keys(self.settings["db_tablename"])
processor = PubKeyProcessor(pub_keys)
use_gzip = self.settings.get("use_gzip", False)

for record in event["Records"]:
s3_obj = record["s3"]
bucket, key = s3_obj["bucket"]["name"], s3_obj["object"]["key"]
f = s3_open(bucket, key, use_gzip=use_gzip)
if self.settings["file_type"] == "heka":
self.process_heka_stream(processor, f)
else:
self.process_json_stream(processor, f)

def process_heka_stream(self, processor, stream):
reader = read_heka_file_stream(stream)
for msg in reader:
processor.process_message(msg)
dump_latest_messages_to_redis(self.redis_server,
processor.latest_messages)

def process_json_stream(self, processor, stream):
for msg in eventsFromJSONLogFile(stream):
processor.process_message(Message(json=msg))
dump_latest_messages_to_redis(self.redis_server,
processor.latest_messages)
38 changes: 16 additions & 22 deletions push_processor/tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import redis
from mock import Mock, patch
from nose.tools import eq_, ok_
from nose.tools import eq_

import push_messages.tests as pmtests

Expand Down Expand Up @@ -40,25 +40,17 @@ def tearDown():


class TestHandler(unittest.TestCase):
def test_setup(self):
import push_processor.handler as handler
handler.setup_objects()
eq_(len(handler.settings), 6)
ok_("redis_host" in handler.settings)

def test_lambda(self):
import push_processor.handler as handler
handler.redis_server = None
result = handler.aws_lambda(dict(
result = handler.Lambda.handler(dict(
Bucket=""
), None)
eq_(result, "Test event")

handler.redis_server = None
result = handler.aws_lambda(dict(), None)
result = handler.Lambda.handler(dict(), None)
eq_(result, "No record found in event")

result = handler.aws_lambda(dict(
result = handler.Lambda.handler(dict(
Records=[dict(s3=dict(
bucket=dict(name="push-test"),
object=dict(key="test_protobuf_stream.gz")
Expand All @@ -71,16 +63,17 @@ def test_lambda_with_json(self, mock_pubkey):
import push_processor.handler as handler
mock_pubkey.return_value = mock_processor = Mock()
mock_processor.latest_messages = {}
l = handler.Lambda()

handler.settings = {
l.settings = {
"s3_bucket": "push-test",
"s3_key": "opts.js",
"redis_port": 6379,
"db_tablename": "push_messages_db",
"file_type": "json"
}

result = handler.aws_lambda(dict(
result = l.handle_event(dict(
Records=[dict(s3=dict(
bucket=dict(name="push-test"),
object=dict(key="push_dash_logs.json")
Expand All @@ -92,18 +85,19 @@ def test_lambda_with_json(self, mock_pubkey):
def test_json_process(self):
pkey = uuid.uuid4().hex
from push_processor.processor.pubkey import PubKeyProcessor
from push_processor.handler import process_json_stream
from push_processor.handler import Lambda
l = Lambda()
proc = PubKeyProcessor([pkey])
msg = json.loads(TEST_MESSAGE)
msg["Fields"]["jwt"] = {"crypto_key": pkey}
msg["Fields"]["message_id"] = "jailj24il2j424ijiljlija"
msg["Fields"]["message_size"] = 312
msg["Fields"]["message_ttl"] = 600
f = StringIO.StringIO(json.dumps(msg))
redis_server = redis.StrictRedis()
process_json_stream(redis_server, proc, f)
eq_(redis_server.exists(pkey), True)
eq_(redis_server.llen(pkey), 1)
l.redis_server = redis.StrictRedis()
l.process_json_stream(proc, f)
eq_(l.redis_server.exists(pkey), True)
eq_(l.redis_server.llen(pkey), 1)

# Now run with at least 100 messages
messages = []
Expand All @@ -112,6 +106,6 @@ def test_json_process(self):
nmsg["Fields"]["message_id"] = str(uuid.uuid4())
messages.append(nmsg)
f = StringIO.StringIO("\n".join([json.dumps(m) for m in messages]))
process_json_stream(redis_server, proc, f)
eq_(redis_server.exists(pkey), True)
eq_(redis_server.llen(pkey), 100)
l.process_json_stream(proc, f)
eq_(l.redis_server.exists(pkey), True)
eq_(l.redis_server.llen(pkey), 100)

0 comments on commit b4fcf4e

Please sign in to comment.