Skip to content

Commit

Permalink
Merge pull request segmentio#162 from pberganza-applaudostudios/fix/l…
Browse files Browse the repository at this point in the history
…inting-errors

Fix/linting errors
  • Loading branch information
lubird authored Jan 20, 2020
2 parents 2a314e6 + b89bda9 commit 641862e
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 70 deletions.
16 changes: 12 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
steps:
- checkout
- run: pip install --user .
- run: sudo pip install coverage pylint==1.9.3 pycodestyle
- run: sudo pip install coverage pylint==1.9.3 flake8
- run: make test
- persist_to_workspace:
root: .
Expand All @@ -19,7 +19,7 @@ jobs:
- store_artifacts:
path: pylint.out
- store_artifacts:
path: pycodestyle.out
path: flake8.out

coverage:
docker:
Expand Down Expand Up @@ -51,7 +51,11 @@ jobs:
steps:
- checkout
- run: pip install --user .
- run: sudo pip install setuptools coverage pylint==1.9.3 pycodestyle
- run: sudo pip install setuptools coverage pylint==1.9.3 flake8
- run:
name: Linting with Flake8
command: |
git diff origin/master..HEAD analytics | flake8 --diff --max-complexity=10 analytics
- run: make test
- run: make e2e_test

Expand All @@ -61,7 +65,11 @@ jobs:
steps:
- checkout
- run: pip install --user .
- run: sudo pip install coverage pylint==1.9.3 pycodestyle
- run: sudo pip install coverage pylint==1.9.3 flake8
- run:
name: Linting with Flake8
command: |
git diff origin/master..HEAD analytics | flake8 --diff --max-complexity=10 analytics
- run: make test
- run: make e2e_test

Expand Down
4 changes: 1 addition & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
test:
pylint --rcfile=.pylintrc --reports=y --exit-zero analytics | tee pylint.out
# fail on pycodestyle errors on the code change
git diff origin/master..HEAD analytics | pycodestyle --ignore=E501 --diff --statistics --count
pycodestyle --ignore=E501 --statistics analytics > pycodestyle.out || true
flake8 --max-complexity=10 --statistics analytics > flake8.out || true
coverage run --branch --include=analytics/\* --omit=*/test* setup.py test

release:
Expand Down
12 changes: 10 additions & 2 deletions analytics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,37 @@ def track(*args, **kwargs):
"""Send a track call."""
_proxy('track', *args, **kwargs)


def identify(*args, **kwargs):
"""Send a identify call."""
_proxy('identify', *args, **kwargs)


def group(*args, **kwargs):
"""Send a group call."""
_proxy('group', *args, **kwargs)


def alias(*args, **kwargs):
"""Send a alias call."""
_proxy('alias', *args, **kwargs)


def page(*args, **kwargs):
"""Send a page call."""
_proxy('page', *args, **kwargs)


def screen(*args, **kwargs):
"""Send a screen call."""
_proxy('screen', *args, **kwargs)


def flush():
"""Tell the client to flush."""
_proxy('flush')


def join():
"""Block program until the client clears the queue"""
_proxy('join')
Expand All @@ -58,8 +65,9 @@ def _proxy(method, *args, **kwargs):
"""Create an analytics client if one doesn't exist and send to it."""
global default_client
if not default_client:
default_client = Client(write_key, host=host, debug=debug, on_error=on_error,
send=send, sync_mode=sync_mode)
default_client = Client(write_key, host=host, debug=debug,
on_error=on_error, send=send,
sync_mode=sync_mode)

fn = getattr(default_client, method)
fn(*args, **kwargs)
29 changes: 18 additions & 11 deletions analytics/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

try:
import queue
except:
except ImportError:
import Queue as queue


Expand Down Expand Up @@ -48,17 +48,20 @@ def __init__(self, write_key=None, host=None, debug=False,
self.consumers = None
else:
# On program exit, allow the consumer thread to exit cleanly.
# This prevents exceptions and a messy shutdown when the interpreter is
# destroyed before the daemon thread finishes execution. However, it
# is *not* the same as flushing the queue! To guarantee all messages
# have been delivered, you'll still need to call flush().
# This prevents exceptions and a messy shutdown when the
# interpreter is destroyed before the daemon thread finishes
# execution. However, it is *not* the same as flushing the queue!
# To guarantee all messages have been delivered, you'll still need
# to call flush().
if send:
atexit.register(self.join)
for n in range(thread):
self.consumers = []
consumer = Consumer(self.queue, write_key, host=host, on_error=on_error,
flush_at=flush_at, flush_interval=flush_interval,
gzip=gzip, retries=max_retries, timeout=timeout)
consumer = Consumer(
self.queue, write_key, host=host, on_error=on_error,
flush_at=flush_at, flush_interval=flush_interval,
gzip=gzip, retries=max_retries, timeout=timeout,
)
self.consumers.append(consumer)

# if we've disabled sending, just don't start the consumer
Expand Down Expand Up @@ -87,7 +90,8 @@ def identify(self, user_id=None, traits=None, context=None, timestamp=None,
return self._enqueue(msg)

def track(self, user_id=None, event=None, properties=None, context=None,
timestamp=None, anonymous_id=None, integrations=None, message_id=None):
timestamp=None, anonymous_id=None, integrations=None,
message_id=None):
properties = properties or {}
context = context or {}
integrations = integrations or {}
Expand Down Expand Up @@ -129,7 +133,8 @@ def alias(self, previous_id=None, user_id=None, context=None,
return self._enqueue(msg)

def group(self, user_id=None, group_id=None, traits=None, context=None,
timestamp=None, anonymous_id=None, integrations=None, message_id=None):
timestamp=None, anonymous_id=None, integrations=None,
message_id=None):
traits = traits or {}
context = context or {}
integrations = integrations or {}
Expand Down Expand Up @@ -266,7 +271,9 @@ def flush(self):
self.log.debug('successfully flushed about %s items.', size)

def join(self):
"""Ends the consumer thread once the queue is empty. Blocks execution until finished"""
"""Ends the consumer thread once the queue is empty.
Blocks execution until finished
"""
for consumer in self.consumers:
consumer.pause()
try:
Expand Down
18 changes: 12 additions & 6 deletions analytics/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import backoff
import json

from analytics.version import VERSION
from analytics.request import post, APIError, DatetimeSerializer

try:
Expand All @@ -23,8 +22,9 @@ class Consumer(Thread):
"""Consumes the messages from the client's queue."""
log = logging.getLogger('segment')

def __init__(self, queue, write_key, flush_at=100, host=None, on_error=None,
flush_interval=0.5, gzip=False, retries=10, timeout=15):
def __init__(self, queue, write_key, flush_at=100, host=None,
on_error=None, flush_interval=0.5, gzip=False, retries=10,
timeout=15):
"""Create a consumer thread."""
Thread.__init__(self)
# Make consumer a daemon thread so that it doesn't block program exit
Expand All @@ -38,7 +38,8 @@ def __init__(self, queue, write_key, flush_at=100, host=None, on_error=None,
self.gzip = gzip
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running forever.
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self.retries = retries
self.timeout = timeout
Expand Down Expand Up @@ -113,14 +114,19 @@ def request(self, batch):

def fatal_exception(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors with 429 status code (rate limited),
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
else:
# retry on all other errors (eg. network)
return False

@backoff.on_exception(backoff.expo, Exception, max_tries=self.retries + 1, giveup=fatal_exception)
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception)
def send_request():
post(self.write_key, self.host, gzip=self.gzip,
timeout=self.timeout, batch=batch)
Expand Down
6 changes: 4 additions & 2 deletions analytics/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ def post(write_key, host=None, gzip=False, timeout=15, **kwargs):
headers['Content-Encoding'] = 'gzip'
buf = BytesIO()
with GzipFile(fileobj=buf, mode='w') as gz:
# 'data' was produced by json.dumps(), whose default encoding is utf-8.
# 'data' was produced by json.dumps(),
# whose default encoding is utf-8.
gz.write(data.encode('utf-8'))
data = buf.getvalue()

res = _session.post(url, data=data, auth=auth, headers=headers, timeout=timeout)
res = _session.post(url, data=data, auth=auth,
headers=headers, timeout=timeout)

if res.status_code == 200:
log.debug('data uploaded successfully')
Expand Down
2 changes: 2 additions & 0 deletions analytics/test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import logging
import sys


def all_names():
for _, modname, _ in pkgutil.iter_modules(__path__):
yield 'analytics.test.' + modname


def all():
logging.basicConfig(stream=sys.stderr)
return unittest.defaultTestLoader.loadTestsFromNames(all_names())
3 changes: 2 additions & 1 deletion analytics/test/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ def mock_post_fn(*args, **kwargs):

# the post function should be called 2 times, with a batch size of 10
# each time.
with mock.patch('analytics.consumer.post', side_effect=mock_post_fn) as mock_post:
with mock.patch('analytics.consumer.post', side_effect=mock_post_fn) \
as mock_post:
for _ in range(20):
client.identify('userId', {'trait': 'value'})
time.sleep(1)
Expand Down
33 changes: 20 additions & 13 deletions analytics/test/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ def test_upload(self):
self.assertTrue(success)

def test_flush_interval(self):
# Put _n_ items in the queue, pausing a little bit more than _flush_interval_
# after each one. The consumer should upload _n_ times.
# Put _n_ items in the queue, pausing a little bit more than
# _flush_interval_ after each one.
# The consumer should upload _n_ times.
q = Queue()
flush_interval = 0.3
consumer = Consumer(q, 'testsecret', flush_at=10,
Expand All @@ -71,8 +72,8 @@ def test_flush_interval(self):
self.assertEqual(mock_post.call_count, 3)

def test_multiple_uploads_per_interval(self):
# Put _flush_at*2_ items in the queue at once, then pause for _flush_interval_.
# The consumer should upload 2 times.
# Put _flush_at*2_ items in the queue at once, then pause for
# _flush_interval_. The consumer should upload 2 times.
q = Queue()
flush_interval = 0.5
flush_at = 10
Expand All @@ -99,34 +100,38 @@ def test_request(self):
}
consumer.request([track])

def _test_request_retry(self, consumer, expected_exception, exception_count):
def _test_request_retry(self, consumer,
expected_exception, exception_count):

def mock_post(*args, **kwargs):
mock_post.call_count += 1
if mock_post.call_count <= exception_count:
raise expected_exception
mock_post.call_count = 0

with mock.patch('analytics.consumer.post', mock.Mock(side_effect=mock_post)):
with mock.patch('analytics.consumer.post',
mock.Mock(side_effect=mock_post)):
track = {
'type': 'track',
'event': 'python event',
'userId': 'userId'
}
# request() should succeed if the number of exceptions raised is less
# than the retries paramater.
# request() should succeed if the number of exceptions raised is
# less than the retries paramater.
if exception_count <= consumer.retries:
consumer.request([track])
else:
# if exceptions are raised more times than the retries parameter,
# we expect the exception to be returned to the caller.
# if exceptions are raised more times than the retries
# parameter, we expect the exception to be returned to
# the caller.
try:
consumer.request([track])
except type(expected_exception) as exc:
self.assertEqual(exc, expected_exception)
else:
self.fail(
"request() should raise an exception if still failing after %d retries" % consumer.retries)
"request() should raise an exception if still failing "
"after %d retries" % consumer.retries)

def test_request_retry(self):
# we should retry on general errors
Expand Down Expand Up @@ -180,10 +185,12 @@ def mock_post_fn(_, data, **kwargs):
res = mock.Mock()
res.status_code = 200
self.assertTrue(len(data.encode()) < 500000,
'batch size (%d) exceeds 500KB limit' % len(data.encode()))
'batch size (%d) exceeds 500KB limit'
% len(data.encode()))
return res

with mock.patch('analytics.request._session.post', side_effect=mock_post_fn) as mock_post:
with mock.patch('analytics.request._session.post',
side_effect=mock_post_fn) as mock_post:
consumer.start()
for _ in range(0, n_msgs + 2):
q.put(track)
Expand Down
2 changes: 1 addition & 1 deletion analytics/test/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_track(self):
analytics.flush()

def test_identify(self):
analytics.identify('userId', { 'email': '[email protected]' })
analytics.identify('userId', {'email': '[email protected]'})
analytics.flush()

def test_group(self):
Expand Down
10 changes: 6 additions & 4 deletions analytics/test/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ def test_valid_request(self):
self.assertEqual(res.status_code, 200)

def test_invalid_request_error(self):
self.assertRaises(Exception, post, 'testsecret', 'https://api.segment.io', False, '[{]')
self.assertRaises(Exception, post, 'testsecret',
'https://api.segment.io', False, '[{]')

def test_invalid_host(self):
self.assertRaises(Exception, post, 'testsecret', 'api.segment.io/', batch=[])
self.assertRaises(Exception, post, 'testsecret',
'api.segment.io/', batch=[])

def test_datetime_serialization(self):
data = { 'created': datetime(2012, 3, 4, 5, 6, 7, 891011) }
data = {'created': datetime(2012, 3, 4, 5, 6, 7, 891011)}
result = json.dumps(data, cls=DatetimeSerializer)
self.assertEqual(result, '{"created": "2012-03-04T05:06:07.891011"}')

Expand All @@ -44,7 +46,7 @@ def test_should_not_timeout(self):

def test_should_timeout(self):
with self.assertRaises(requests.ReadTimeout):
res = post('testsecret', batch=[{
post('testsecret', batch=[{
'userId': 'userId',
'event': 'python event',
'type': 'track'
Expand Down
Loading

0 comments on commit 641862e

Please sign in to comment.