Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: allow logging to batch send to aws firehose
Browse files Browse the repository at this point in the history
Add's a component to the logger that optionally dumps the log output
directly to AWS Firehose. Logging should be set to 'json' for proper
reading.

Closes #421
  • Loading branch information
bbangert committed Mar 21, 2016
1 parent 9fdd5ce commit cad5423
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
language: python
sudo: false
sudo: required
dist: precise
python:
- "2.7"
install:
Expand Down
120 changes: 115 additions & 5 deletions autopush/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
"""
import io
import json
import Queue
import pkg_resources
import socket
import sys
import time
import threading

import boto3
import raven
from twisted.internet import reactor
from twisted.logger import (
Expand Down Expand Up @@ -49,7 +53,8 @@
@implementer(ILogObserver)
class PushLogger(object):
def __init__(self, logger_name, log_level="debug", log_format="json",
log_output="stdout", sentry_dsn=None):
log_output="stdout", sentry_dsn=None,
firehose_delivery_stream=None):
self.logger_name = "-".join([
logger_name,
pkg_resources.get_distribution("autopush").version
Expand All @@ -58,6 +63,8 @@ def __init__(self, logger_name, log_level="debug", log_format="json",
self.log_level = LogLevel.lookupByName(log_level)
if log_output == "stdout":
self._output = sys.stdout
elif log_output == "none":
self._output = None
else:
self._filename = log_output
self._output = "file"
Expand All @@ -70,6 +77,11 @@ def __init__(self, logger_name, log_level="debug", log_format="json",
release=raven.fetch_package_version("autopush"))
else:
self.raven_client = None
if firehose_delivery_stream:
self.firehose = FirehoseProcessor(
stream_name=firehose_delivery_stream)
else:
self.firehose = None

def __call__(self, event):
if event["log_level"] < self.log_level:
Expand All @@ -83,8 +95,13 @@ def __call__(self, event):
)

text = self.format_event(event)
self._output.write(unicode(text))
self._output.flush()

if self.firehose:
self.firehose.process(text)

if self._output:
self._output.write(unicode(text))
self._output.flush()

def json_format(self, event):
error = bool(event.get("isError")) or "failure" in event
Expand All @@ -108,23 +125,116 @@ def json_format(self, event):
}
# Add the nicely formatted message
msg["Fields"]["message"] = formatEvent(event)

return json.dumps(msg, skipkeys=True) + "\n"

def start(self):
if self._filename:
self._output = io.open(self._filename, "a", encoding="utf-8")
if self.firehose:
self.firehose.start()
globalLogPublisher.addObserver(self)

def stop(self):
globalLogPublisher.removeObserver(self)
if self._filename:
self._output.close()
self._output = None
if self.firehose:
self.firehose.stop()

@classmethod
def setup_logging(cls, logger_name, log_level="info", log_format="json",
log_output="stdout", sentry_dsn=None):
log_output="stdout", sentry_dsn=None,
firehose_delivery_stream=None):
pl = cls(logger_name, log_level=log_level, log_format=log_format,
log_output=log_output, sentry_dsn=sentry_dsn)
log_output=log_output, sentry_dsn=sentry_dsn,
firehose_delivery_stream=firehose_delivery_stream)
pl.start()
reactor.addSystemEventTrigger('before', 'shutdown', pl.stop)
return pl


class FirehoseProcessor(object):
RECORD_SEPARATOR = u"\x1e"
MAX_RECORD_SIZE = 1024 * 1024
MAX_REQUEST_SIZE = 4 * 1024 * 1024
MAX_RECORD_BATCH = 500
MAX_INTERVAL = 30

def __init__(self, stream_name, maxsize=0):
self._records = Queue.Queue(maxsize=maxsize)
self._prepped = []
self._total_size = 0
self._thread = None
self._client = boto3.client("firehose")
self._run = False
self._stream_name = stream_name

def start(self):
self._thread = threading.Thread(target=self._worker)
self._thread.start()

def stop(self):
self._records.put_nowait(None)
self._thread.join()
self._thread = None

def process(self, record):
try:
self._records.put_nowait(record)
except Queue.Full:
# Drop extra records
pass

def _worker(self):
self._last_send = time.time()
while True:
time_since_sent = time.time() - self._last_send
try:
record = self._records.get(
timeout=max(self.MAX_INTERVAL-time_since_sent, 0))
except Queue.Empty:
# Send the records
self._send_record_batch()
continue

if record is None:
# Stop signal so we exit
break

# Is this record going to put us over our request size?
rec_size = len(record) + 1
if self._total_size + rec_size >= self.MAX_REQUEST_SIZE:
self._send_record_batch()

# Store this record
self._prepped.append(record)
self._total_size += rec_size

if len(self._prepped) >= self.MAX_RECORD_BATCH:
self._send_record_batch()

# We're done running, send any remaining
self._send_record_batch()

def _send_record_batch(self):
if not self._prepped:
return

# Attempt to send the record batch twice, or give up
tries = 0
while tries < 3:
response = self._client.put_record_batch(
DeliveryStreamName=self._stream_name,
Records=[{"Data": bytes(self.RECORD_SEPARATOR + record)}
for record in self._prepped]
)
if response["FailedPutCount"] > 0:
tries += 1
else:
break

self._prepped = []
self._total_size = 0
self._last_send = time.time()
25 changes: 19 additions & 6 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def add_shared_args(parser):
default="", env_var="LOG_LEVEL")
parser.add_argument('--log_output', help="Log output, stdout or filename",
default="stdout", env_var="LOG_OUTPUT")
parser.add_argument('--firehose_stream_name', help="Firehose Delivery"
" Stream Name", default="", env_var="STREAM_NAME",
type=str)
parser.add_argument('--crypto_key', help="Crypto key for tokens",
default=[], env_var="CRYPTO_KEY", type=str,
action="append")
Expand Down Expand Up @@ -359,9 +362,14 @@ def connection_main(sysargs=None, use_files=True):
log_format = "text" if args.human_logs else "json"
log_level = args.log_level or ("debug" if args.debug else "info")
sentry_dsn = bool(os.environ.get("SENTRY_DSN"))
PushLogger.setup_logging("Autopush", log_level=log_level,
log_format=log_format, log_output=args.log_output,
sentry_dsn=sentry_dsn)
PushLogger.setup_logging(
"Autopush",
log_level=log_level,
log_format=log_format,
log_output=args.log_output,
sentry_dsn=sentry_dsn,
firehose_delivery_stream=args.firehose_stream_name
)
settings = make_settings(
args,
port=args.port,
Expand Down Expand Up @@ -463,9 +471,14 @@ def endpoint_main(sysargs=None, use_files=True):
log_level = args.log_level or ("debug" if args.debug else "info")
log_format = "text" if args.human_logs else "json"
sentry_dsn = bool(os.environ.get("SENTRY_DSN"))
PushLogger.setup_logging("Autoendpoint", log_level=log_level,
log_format=log_format, log_output=args.log_output,
sentry_dsn=sentry_dsn)
PushLogger.setup_logging(
"Autoendpoint",
log_level=log_level,
log_format=log_format,
log_output=args.log_output,
sentry_dsn=sentry_dsn,
firehose_delivery_stream=args.firehose_stream_name
)

settings = make_settings(
args,
Expand Down
81 changes: 80 additions & 1 deletion autopush/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from twisted.logger import Logger
from twisted.python import failure

from autopush.logging import PushLogger
from autopush.logging import PushLogger, FirehoseProcessor

log = Logger()

Expand Down Expand Up @@ -81,3 +81,82 @@ def test_file_output(self):
with open("testfile.txt") as f:
lines = f.readlines()
eq_(len(lines), 1)

@patch("autopush.logging.boto3")
def test_firehose_only_output(self, mock_boto3):
obj = PushLogger("Autoput", log_output="none",
firehose_delivery_stream="test")
obj.firehose = Mock(spec=FirehoseProcessor)
obj.start()
log.info("wow")
obj.stop()
eq_(len(obj.firehose.mock_calls), 3)
eq_(len(obj.firehose.process.mock_calls), 1)


class FirehoseProcessorTestCase(twisted.trial.unittest.TestCase):
def setUp(self):
patcher = patch("autopush.logging.boto3")
self.patcher = patcher
self.mock_boto = patcher.start()

def tearDown(self):
self.patcher.stop()

def test_full_queue(self):
proc = FirehoseProcessor("test", 1)
proc.process("test")
eq_(proc._records.full(), True)
proc.process("another")
eq_(proc._records.qsize(), 1)
eq_(proc._records.get(), "test")

def test_message_max_size(self):
proc = FirehoseProcessor("test")
proc.MAX_REQUEST_SIZE = 1

# Setup the mock
proc._client.put_record_batch.return_value = dict(FailedPutCount=0)

# Start and log
proc.start()
proc.process("a decently larger message")
proc.stop()
eq_(len(self.mock_boto.mock_calls), 2)
eq_(len(proc._client.put_record_batch.mock_calls), 1)

def test_message_max_batch(self):
proc = FirehoseProcessor("test")
proc.MAX_RECORD_BATCH = 1

# Setup the mock
proc._client.put_record_batch.return_value = dict(FailedPutCount=0)

# Start and log
proc.start()
proc.process("a decently larger message")
proc.stop()
eq_(len(self.mock_boto.mock_calls), 2)
eq_(len(proc._client.put_record_batch.mock_calls), 1)

def test_queue_timeout(self):
proc = FirehoseProcessor("test")
proc.MAX_INTERVAL = 0

proc.start()
proc.stop()
eq_(len(self.mock_boto.mock_calls), 1)

def test_batch_send_failure(self):
proc = FirehoseProcessor("test")
proc.MAX_RECORD_BATCH = 1

# Setup the mock
proc._client.put_record_batch.return_value = dict(FailedPutCount=1)

# Start and log
proc.start()
proc.process("a decently larger message")
proc.stop()
eq_(len(self.mock_boto.mock_calls), 4)
eq_(len(proc._client.put_record_batch.mock_calls), 3)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ apns==2.0.1
argparse==1.2.1
autobahn[twisted]==0.13.0
boto==2.38.0
boto3==1.3.0
cffi==1.1.2
characteristic==14.3.0
cryptography==1.2.3
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ apns==2.0.1
argparse==1.2.1
autobahn[twisted]==0.13.0
boto==2.38.0
boto3==1.3.0
#cffi==1.1.2
characteristic==14.3.0
cryptography==1.2.3
Expand Down

0 comments on commit cad5423

Please sign in to comment.