Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka shim #11

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
11 changes: 11 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
blueox (0.12.0)
* Move recorders to separate module
* Add pycernan recorder
* Update the way blueox is configured to allow desired
recorder from imported constant

-- Aaron Biller <[email protected]> Tue Sep 4 12:40:45 2018 -0400

blueox (0.11.6.4)
* Fix encoding of unknown types

blueox (0.11.6.3)
* Fix handling of unicode strings

Expand Down
50 changes: 33 additions & 17 deletions blueox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"""

__title__ = 'blueox'
__version__ = '0.11.6.3'
__version__ = '0.12.0'
__author__ = 'Rhett Garber'
__author_email__ = '[email protected]'
__license__ = 'ISC'
Expand All @@ -18,10 +18,8 @@
__url__ = 'https://github.com/rhettg/BlueOx'

import logging
import os

from . import utils
from . import network
from . import ports
from .context import (
Context, set, append, add, context_wrap, current_context, find_context,
Expand All @@ -30,45 +28,63 @@
from .errors import Error
from .logger import LogHandler
from .timer import timeit
from .recorders import pycernan, zmq

log = logging.getLogger(__name__)

ZMQ_RECORDER = 'zmq'
PYCERNAN_RECORDER = 'pycernan'
RECORDERS = {
ZMQ_RECORDER: zmq,
PYCERNAN_RECORDER: pycernan,
}
DEFAULT_RECORDER = ZMQ_RECORDER


def configure(host, port, recorder=None):
"""Initialize blueox

This instructs the blueox system where to send it's logging data. If blueox is not configured, log data will
be silently dropped.
This instructs the blueox system where to send its logging data.
If blueox is not configured, log data will be silently dropped.

Currently we support logging through the network (and the configured host and port) to a blueoxd instances, or
to the specified recorder function
Currently we support logging through the network (and the configured host
and port) to a blueoxd instances, or to the specified recorder function.
"""
if recorder:
if callable(recorder):
_context_mod._recorder_function = recorder
elif host and port:
network.init(host, port)
_context_mod._recorder_function = network.send

else:
log.info("Empty blueox configuration")
_context_mod._recorder_function = None
_rec = RECORDERS.get(recorder, None)

if _rec is not None:
_rec.init(host, port)
_context_mod._recorder_function = _rec.send
else:
log.info("Empty blueox configuration")
_context_mod._recorder_function = None

def default_configure(host=None):

def default_configure(host=None, recorder=DEFAULT_RECORDER):
"""Configure BlueOx based on defaults

Accepts a connection string override in the form `localhost:3514`. Respects
environment variable BLUEOX_HOST
"""
host = ports.default_collect_host(host)
_rec = RECORDERS.get(recorder, None)
if _rec is None:
_rec = RECORDERS.get(DEFAULT_RECORDER)

host = _rec.default_host(host)
hostname, port = host.split(':')

try:
int_port = int(port)
except ValueError:
raise Error("Invalid value for port")

configure(hostname, int_port)
configure(hostname, int_port, recorder=recorder)


def shutdown():
network.close()
zmq.close()
pycernan.close()
16 changes: 9 additions & 7 deletions blueox/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
blueox.client
~~~~~~~~

This module provides utilities for writing client applications which connect or use blueox data.
This module provides utilities for writing client applications
which connector use blueox data.

:copyright: (c) 2012 by Rhett Garber
:license: ISC, see LICENSE for more details.
Expand Down Expand Up @@ -35,7 +36,8 @@ def default_host(host=None):


def decode_stream(stream):
"""A generator which reads data out of the buffered file stream, unpacks and decodes the blueox events
"""A generator which reads data out of the buffered file stream,
unpacks and decodes the blueox events

This is useful for parsing on disk log files generated by blueoxd
"""
Expand Down Expand Up @@ -97,8 +99,8 @@ def subscribe_stream(control_host, subscribe):
sock.connect("tcp://%s" % (stream_host,))

# Now that we are connected, loop almost forever emiting events.
# If we fail to receive any events within the specified timeout, we'll quit
# and verify that we are connected to a valid stream.
# If we fail to receive any events within the specified timeout,
# we'll quit and verify that we are connected to a valid stream.
poller = zmq.Poller()
poller.register(sock, zmq.POLLIN)
while True:
Expand All @@ -113,7 +115,7 @@ def subscribe_stream(control_host, subscribe):
if not prefix and subscription and channel != subscription:
continue

yield msgpack.unpackb(data,encoding='utf8')
yield msgpack.unpackb(data, encoding='utf8')
else:
break

Expand All @@ -137,10 +139,10 @@ def stdin_stream():

class Grouper(object):
"""Utility for grouping events and sub-events together.

Events fed into a Grouper are joined by their common 'id'. Encountering the
parent event type will trigger emitting a list of all events and sub events
for that single id.
for that single id.

This assumes that the parent event will be the last encountered.

Expand Down
19 changes: 11 additions & 8 deletions blueox/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging

from . import utils
from . import network

log = logging.getLogger(__name__)

Expand All @@ -41,8 +40,10 @@ def __init__(self, type_name, id=None, sample=None):
heirarchy of parent requests. Examples:

'.foo' - Will generate a name like '<parent name>.foo'
'.foo.bar' - If the parent ends in '.foo', the final name will be '<parent name>.bar'
'^.foo' - Will use the top-most context, generating '<top parent name>.foo'
'.foo.bar' - If the parent ends in '.foo', the final name
will be '<parent name>.bar'
'^.foo' - Will use the top-most context, generating
'<top parent name>.foo'
'top.foo.bar' - The name will be based on the longest matched
parent context. If there is a parent context named 'top' and a
parent context named 'top.foo', the new context will be named
Expand Down Expand Up @@ -111,11 +112,13 @@ def __init__(self, type_name, id=None, sample=None):
elif parent_ctx:
self.id = parent_ctx.id
else:
# Generate an id if one wasn't provided and we don't have any parents
# We're going to encode the time as the front 4 bytes so we have some order to the ids
# that could prove useful later on by making sorting a little easier.
self.id = (struct.pack(">L", int(time.time())) + os.urandom(12)).encode(
'hex')
# Generate an id if one wasn't provided and we don't have any
# parents. We're going to encode the time as the front 4 bytes
# so we have some order to the ids that could prove useful
# later on by making sorting a little easier.
self.id = (
struct.pack(">L", int(time.time()))
+ os.urandom(12)).encode('hex')

if parent_ctx and not parent_ctx.enabled:
self.enabled = False
Expand Down
1 change: 0 additions & 1 deletion blueox/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

1 change: 0 additions & 1 deletion blueox/contrib/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

20 changes: 13 additions & 7 deletions blueox/contrib/celery/celery_signals.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Hooks for gathering celery task data into blueox.

Importing this module will register signal handlers into Celery worker's runtime.

We also will track creation of tasks on the client side.
Importing this module will register signal handlers into Celery
worker's runtime. We also will track creation of tasks on the client side.
"""
import traceback

Expand Down Expand Up @@ -33,17 +32,24 @@ def on_task_sent(sender=None, body=None, **kwargs):
@signals.task_sent.connect
def on_task_sent(**kwargs):
with blueox.Context('.celery.task_sent'):
# Arguments for this signal are different than the worker signals. Sometimes
# they are even different than what the documentation says. See also
# https://github.com/celery/celery/issues/1606
# Arguments for this signal are different than the worker signals.
# Sometimes they are even different than what the documentation
# says. See also https://github.com/celery/celery/issues/1606
blueox.set('task_id', kwargs.get('task_id', kwargs.get('id')))
blueox.set('task', str(kwargs['task']))
blueox.set('eta', kwargs['eta'])


@signals.worker_process_init.connect
def on_worker_process_init(**kwargs):
if hasattr(settings, 'BLUEOX_HOST'):
if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'):
if settings.BLUEOX_PYCERNAN_HOST:
rec = blueox.PYCERNAN_RECORDER
blueox.default_configure(
settings.BLUEOX_PYCERNAN_HOST, recorder=rec)
else:
blueox.configure(None, None)
elif hasattr(settings, 'BLUEOX_HOST'):
if settings.BLUEOX_HOST:
blueox.default_configure(settings.BLUEOX_HOST)
else:
Expand Down
1 change: 0 additions & 1 deletion blueox/contrib/django/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@

14 changes: 11 additions & 3 deletions blueox/contrib/django/middleware.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys
import traceback
import logging

import blueox

Expand All @@ -10,7 +9,14 @@
class Middleware(object):

def __init__(self):
if hasattr(settings, 'BLUEOX_HOST'):
if hasattr(settings, 'BLUEOX_PYCERNAN_HOST'):
if settings.BLUEOX_PYCERNAN_HOST:
rec = blueox.PYCERNAN_RECORDER
blueox.default_configure(
settings.BLUEOX_PYCERNAN_HOST, recorder=rec)
else:
blueox.configure(None, None)
elif hasattr(settings, 'BLUEOX_HOST'):
if settings.BLUEOX_HOST:
blueox.default_configure(settings.BLUEOX_HOST)
else:
Expand All @@ -28,7 +34,9 @@ def process_request(self, request):

headers = {}
for k, v in request.META.iteritems():
if k.startswith('HTTP_') or k in ('CONTENT_LENGTH', 'CONTENT_TYPE'):
if (
k.startswith('HTTP_') or
k in ('CONTENT_LENGTH', 'CONTENT_TYPE')):
headers[k] = v
blueox.set('headers', headers)

Expand Down
12 changes: 9 additions & 3 deletions blueox/contrib/flask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ class BlueOxMiddleware(object):
def __init__(self, app):
self.app = app

if 'BLUEOX_HOST' in app.config:
if 'BLUEOX_PYCERNAN_HOST' in app.config:
self.blueox_pycernan_host = app.config['BLUEOX_PYCERNAN_HOST']
if self.blueox_pycernan_host:
rec = blueox.PYCERNAN_RECORDER
blueox.default_configure(
self.blueox_pycernan_host, recorder=rec)
elif 'BLUEOX_HOST' in app.config:
self.blueox_host = app.config['BLUEOX_HOST']
if self.blueox_host:
blueox.default_configure(self.blueox_host)
Expand All @@ -45,8 +51,8 @@ def before_request(self, *args, **kwargs):
headers = {}
for k, v in request.environ.iteritems():
if (
k.startswith('HTTP_') or k in
('CONTENT_LENGTH', 'CONTENT_TYPE')):
k.startswith('HTTP_') or
k in ('CONTENT_LENGTH', 'CONTENT_TYPE')):
headers[k] = v

blueox.set('headers', headers)
Expand Down
7 changes: 5 additions & 2 deletions blueox/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
blueox.logger
~~~~~~~~

This module provides integration with blueox and standard python logging module.
This module provides integration with blueox and standard
python logging module.

:copyright: (c) 2012 by Rhett Garber
:license: ISC, see LICENSE for more details.

Expand All @@ -20,7 +22,8 @@ class LogHandler(logging.Handler):
Records standard fields such as logger name, level the message and if an
exception was provided, the string formatted exception.

The type name, if not specified will be something like '<my parent context>.log'
The type name, if not specified will be something like
'<my parent context>.log'
"""

def __init__(self, type_name=None):
Expand Down
12 changes: 11 additions & 1 deletion blueox/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def _default_host(host, default_host, default_port):
if not host:
host = default_host
if ':' not in host:
host = "{}:{}".format(host, default_port)
host = '{}:{}'.format(host, default_port)

return host

Expand All @@ -41,3 +41,13 @@ def default_control_host(host=None):
def default_collect_host(host=None):
default_host = os.environ.get(ENV_VAR_COLLECT_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_COLLECT_PORT)


# For consistency, we'll abstract pycernan connections in the same way
ENV_VAR_PYCERNAN_HOST = 'BLUEOX_PYCERNAN_HOST'
DEFAULT_PYCERNAN_PORT = 2003


def default_pycernan_host(host=None):
default_host = os.environ.get(ENV_VAR_PYCERNAN_HOST, DEFAULT_HOST)
return _default_host(host, default_host, DEFAULT_PYCERNAN_PORT)
Empty file added blueox/recorders/__init__.py
Empty file.
Loading