Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka shim #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
10 changes: 8 additions & 2 deletions blueox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,26 @@
from .errors import Error
from .logger import LogHandler
from .timer import timeit
from .recorders import kafka_recorder

log = logging.getLogger(__name__)

OVERRIDE_KAFKA_RECORDER = os.getenv('BLUEOX_OVERRIDE_KAFKA_RECORDER', 0)


def configure(host, port, recorder=None):
"""Initialize blueox

This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will
This instructs the blueox system where to send its logging data. If blueox is not configured, log data will
be silently dropped.

Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or
to the specified recorder function
"""
if recorder:
if int(OVERRIDE_KAFKA_RECORDER) == 1:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not gonna lie, not a fan of magical overrides via env vars. Why not just have it come in as a recorder?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair. I was hoping to avoid a bunch of edits throughout postal changing how it's called, but it would be simple enough to call

import blueox
from blueox.recorders import kafka

blueox.configure(recorder=kafka.send)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh huh, wait, if thats the case no go back to your original way lol

log.info("Kafka override set, using kafka recorder")
_context_mod._recorder_function = kafka_recorder.send
elif recorder:
_context_mod._recorder_function = recorder
elif host and port:
network.init(host, port)
Expand Down
10 changes: 10 additions & 0 deletions blueox/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,13 @@ def default_control_host(host=None):
def default_collect_host(host=None):
default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_COLLECT_PORT)


# For consistency, we'll abstract kafka connections in the same way
ENV_VAR_KAFKA_HOST = 'BLUEOX_KAFKA_HOST'
DEFAULT_KAFKA_PORT = 9002


def default_kafka_host(host=None):
default_host = os.environ.get(ENV_VAR_KAFKA_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_KAFKA_PORT)
Empty file added blueox/recorders/__init__.py
Empty file.
101 changes: 101 additions & 0 deletions blueox/recorders/kafka_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
"""
blueox.kafka
~~~~~~~~

This module provides the interface into Kafka

:copyright: (c) 2018 by Aaron Biller??
:license: ISC, see LICENSE for more details.

"""
import atexit
import logging
import msgpack

from kafka import KafkaProducer

from .. import ports
from .. import utils

log = logging.getLogger(__name__)

# If we have pending outgoing messages, this is how long we'll wait after
# being told to exit.
LINGER_SHUTDOWN_MSECS = 2000

# Producer can be shared between threads
_kafka_producer = None


def init(host=None):
"""Initialize the global kafka producer

Supports a host arg with an overriding kafka host string
in the format 'hostname:port'
"""
global _kafka_producer

host = ports.default_kafka_host(host)

_kafka_producer = KafkaProducer(bootstrap_servers=host)


def _serialize_context(context):
context_dict = context.to_dict()
for key in ('host', 'type'):
if len(context_dict.get(key, "")) > 64:
raise ValueError("Value too long: %r" % key)

context_dict = {
k: v.encode('utf-8') if isinstance(v, unicode)
else v for k, v in context_dict.items()
}

try:
context_data = msgpack.packb(context_dict)
except TypeError:
try:
# If we fail to serialize our context, we can try again with an
# enhanced packer (it's slower though)
context_data = msgpack.packb(context_dict,
default=utils.msgpack_encode_default)
except TypeError:
log.exception("Serialization failure (not fatal, dropping data)")

# One last try after dropping the body
context_dict['body'] = None
context_data = msgpack.packb(context_dict)

return context_data


def send(context):
global _kafka_producer

try:
context_data = _serialize_context(context)
except Exception:
log.exception("Failed to serialize context")
return

if _kafka_producer:
try:
log.debug("Sending msg")
_kafka_producer.send('events', context_data)
except Exception:
log.exception("Failed during publish to kafka.")
else:
log.info("Skipping sending event %s", context.name)


def close():
global _kafka_producer

if _kafka_producer:
_kafka_producer.flush()
_kafka_producer.close(timeout=LINGER_SHUTDOWN_MSECS)
_kafka_producer = None


atexit.register(close)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pyflakes
tornado==3.2
boto
yapf
kafka-python