Skip to content
This repository has been archived by the owner on Jan 12, 2019. It is now read-only.

Commit

Permalink
Merge pull request #5 from mozilla-services/bug/issue-3
Browse files Browse the repository at this point in the history
bug: update json reader to use twisted events from json reader
  • Loading branch information
jrconlin committed Mar 22, 2016
2 parents 455f994 + 3339357 commit b57ac42
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
8 changes: 3 additions & 5 deletions push_processor/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

import redis
from twisted.logger import eventsFromJSONLogFile

from push_processor.aws_helpers import s3_open
from push_processor.db import (
Expand Down Expand Up @@ -79,9 +80,6 @@ def process_heka_stream(redis_server, processor, stream):


def process_json_stream(redis_server, processor, stream):
json_line = stream.readline()
while json_line:
msg = Message(json=json.loads(json_line))
processor.process_message(msg)
json_line = stream.readline()
for msg in eventsFromJSONLogFile(stream):
processor.process_message(Message(json=msg))
dump_latest_messages_to_redis(redis_server, processor.latest_messages)
24 changes: 24 additions & 0 deletions push_processor/tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid

import redis
from mock import Mock, patch
from nose.tools import eq_, ok_

import push_messages.tests as pmtests
Expand Down Expand Up @@ -65,6 +66,29 @@ def test_lambda(self):
), None)
eq_(result, None)

@patch("push_processor.handler.PubKeyProcessor")
def test_lambda_with_json(self, mock_pubkey):
import push_processor.handler as handler
mock_pubkey.return_value = mock_processor = Mock()
mock_processor.latest_messages = {}

handler.settings = {
"s3_bucket": "push-test",
"s3_key": "opts.js",
"redis_port": 6379,
"db_tablename": "push_messages_db",
"file_type": "json"
}

result = handler.aws_lambda(dict(
Records=[dict(s3=dict(
bucket=dict(name="push-test"),
object=dict(key="push_dash_logs.json")
))]
), None)
eq_(result, None)
eq_(len(mock_processor.process_message.mock_calls), 407)

def test_json_process(self):
pkey = uuid.uuid4().hex
from push_processor.processor.pubkey import PubKeyProcessor
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ boto3==1.2.6
gzip_reader==0.1
protobuf==2.6.1
redis==2.10.5
twisted==16.0.0
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ psutil==4.0.0
mock==1.3.0
nose==1.3.7
coverage==4.0.3
twisted==16.0.0

0 comments on commit b57ac42

Please sign in to comment.