Skip to content

Commit

Permalink
reduce number of parallel data retrieving tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
brad committed Apr 25, 2016
1 parent ae6b524 commit 1f2c606
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 79 deletions.
42 changes: 21 additions & 21 deletions fitapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,22 @@ def unsubscribe(*args, **kwargs):
raise Reject(exc, requeue=False)



@shared_task
def get_time_series_data(fitbit_user, cat, resource, date=None):
def get_time_series_data(fitbit_user, categories=[], date=None):
""" Get the user's time series data """

try:
_type = TimeSeriesDataType.objects.get(category=cat, resource=resource)
except TimeSeriesDataType.DoesNotExist:
logger.exception("The resource %s in category %s doesn't exist" % (
resource, cat))
filters = {'category__in': categories} if categories else {}
types = TimeSeriesDataType.objects.filter(**filters)
if not types.exists():
logger.exception("Couldn't find the time series data types")
raise Reject(sys.exc_info()[1], requeue=False)

# Create a lock so we don't try to run the same task multiple times
sdat = date.strftime('%Y-%m-%d') if date else 'ALL'
lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, _type, sdat)
cats = '-'.join('%s' % i for i in categories)
lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, cats, sdat)
if not cache.add(lock_id, 'true', LOCK_EXPIRE):
logger.debug('Already retrieving %s data for date %s, user %s' % (
_type, fitbit_user, sdat))
logger.debug('Already working on %s' % lock_id)
raise Ignore()

fbusers = UserFitbit.objects.filter(fitbit_user=fitbit_user)
Expand All @@ -72,16 +70,15 @@ def get_time_series_data(fitbit_user, cat, resource, date=None):
dates = {'base_date': date, 'end_date': date}
try:
for fbuser in fbusers:
data = utils.get_fitbit_data(fbuser, _type, **dates)
for datum in data:
# Create new record or update existing record
date = parser.parse(datum['dateTime'])
tsd, created = TimeSeriesData.objects.get_or_create(
user=fbuser.user, resource_type=_type, date=date)
tsd.value = datum['value']
tsd.save()
# Release the lock
cache.delete(lock_id)
for _type in types:
data = utils.get_fitbit_data(fbuser, _type, **dates)
for datum in data:
# Create new record or update existing record
date = parser.parse(datum['dateTime'])
tsd, created = TimeSeriesData.objects.get_or_create(
user=fbuser.user, resource_type=_type, date=date)
tsd.value = datum['value']
tsd.save()
except HTTPTooManyRequests:
# We have hit the rate limit for the user, retry when it's reset,
# according to the reply from the failing API call
Expand All @@ -93,11 +90,14 @@ def get_time_series_data(fitbit_user, cat, resource, date=None):
# If the resource is elevation or floors, we are just getting this
# error because the data doesn't exist for this user, so we can ignore
# the error
if not ('elevation' in resource or 'floors' in resource):
if not ('elevation' in _type.resource or 'floors' in _type.resource):
exc = sys.exc_info()[1]
logger.exception("Exception updating data: %s" % exc)
raise Reject(exc, requeue=False)
except Exception:
exc = sys.exc_info()[1]
logger.exception("Exception updating data: %s" % exc)
raise Reject(exc, requeue=False)
finally:
# Release the lock
cache.delete(lock_id)
22 changes: 9 additions & 13 deletions fitapp/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,9 @@ def test_complete(self, tsd_apply_async, sub_apply_async):
response, utils.get_setting('FITAPP_LOGIN_REDIRECT'))
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_once_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
tsdts = TimeSeriesDataType.objects.all()
self.assertEqual(tsd_apply_async.call_count, tsdts.count())
for i, _type in enumerate(tsdts):
tsd_apply_async.assert_any_call(
(fbuser.fitbit_user, _type.category, _type.resource,),
countdown=10 + (i * 5))
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down Expand Up @@ -204,9 +200,9 @@ def test_next(self, tsd_apply_async, sub_apply_async):
self.assertRedirectsNoFollow(response, '/test')
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_once_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
self.assertEqual(
tsd_apply_async.call_count, TimeSeriesDataType.objects.count())
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down Expand Up @@ -251,9 +247,9 @@ def test_integrated(self, tsd_apply_async, sub_apply_async):
client_kwargs=self.token, get_kwargs={'code': self.code})
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
self.assertEqual(tsd_apply_async.call_count,
TimeSeriesDataType.objects.count())
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down
51 changes: 23 additions & 28 deletions fitapp/tests/test_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,13 @@ def test_subscription_update(self, get_fitbit_data):
# from Fitbit.
get_fitbit_data.return_value = [{'value': self.value}]
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self._receive_fitbit_updates()
self.assertEqual(get_fitbit_data.call_count, resources.count())
# Check that the cache locks have been deleted
for resource in resources:
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
category, resource.resource, self.date)
), None)
self.assertEqual(get_fitbit_data.call_count, 1)
# Check that the cache lock has been deleted
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
self.fbuser.fitbit_user, category, self.date)
), None)
date = parser.parse(self.date)
for tsd in TimeSeriesData.objects.filter(user=self.user, date=date):
assert tsd.value, self.value
Expand All @@ -153,15 +151,13 @@ def test_subscription_update_file(self, get_fitbit_data):
# is received from Fitbit.
get_fitbit_data.return_value = [{'value': self.value}]
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self._receive_fitbit_updates(file=True)
self.assertEqual(get_fitbit_data.call_count, resources.count())
# Check that the cache locks have been deleted
for resource in resources:
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
category, resource.resource, self.date)
), None)
self.assertEqual(get_fitbit_data.call_count, 1)
# Check that the cache lock has been deleted
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
self.fbuser.fitbit_user, category, self.date)
), None)
date = parser.parse(self.date)
for tsd in TimeSeriesData.objects.filter(user=self.user, date=date):
assert tsd.value, self.value
Expand All @@ -181,15 +177,15 @@ def test_subscription_update_locked(self, mock_add, get_fitbit_data):
@patch('fitapp.utils.get_fitbit_data')
def test_subscription_update_too_many(self, get_fitbit_data):
# Check that celery tasks get postponed if the rate limit is hit
cat_id = getattr(TimeSeriesDataType, self.category)
_type = TimeSeriesDataType.objects.filter(category=cat_id)[0]
lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format(
self.fbuser.fitbit_user, _type, self.date)
exc = fitbit_exceptions.HTTPTooManyRequests(self._error_response())
exc.retry_after_secs = 21
category = getattr(TimeSeriesDataType, self.category)

def side_effect(*args, **kwargs):
# Delete the cache lock after the first try and adjust the
# get_fitbit_data mock to be successful
lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format(
self.fbuser.fitbit_user, category, self.date)
cache.delete(lock_id)
get_fitbit_data.side_effect = None
get_fitbit_data.return_value = [{
Expand All @@ -198,18 +194,17 @@ def side_effect(*args, **kwargs):
}]
raise exc
get_fitbit_data.side_effect = side_effect
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self.assertEqual(TimeSeriesData.objects.count(), 0)
result = get_time_series_data.apply_async(
(self.fbuser.fitbit_user, _type.category, _type.resource,),
{'date': parser.parse(self.date)})
(self.fbuser.fitbit_user,),
{'categories': [category], 'date': parser.parse(self.date)})
result.get()
# Since celery is in eager mode, we expect a Retry exception first
# and then a second task execution that is successful
self.assertEqual(get_fitbit_data.call_count, 2)
self.assertEqual(TimeSeriesData.objects.count(), 1)
self.assertEqual(TimeSeriesData.objects.get().value, '34')
# and then task executions for each resource in the category to be
# successful
self.assertEqual(get_fitbit_data.call_count, 24)
self.assertEqual(TimeSeriesData.objects.count(), 23)
self.assertEqual(TimeSeriesData.objects.all()[0].value, '34')

def test_problem_queueing_task(self):
get_time_series_data = MagicMock()
Expand Down
33 changes: 16 additions & 17 deletions fitapp/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,9 @@ def complete(request):
SUBSCRIBER_ID = utils.get_setting('FITAPP_SUBSCRIBER_ID')
except ImproperlyConfigured:
return redirect(reverse('fitbit-error'))
subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=5)
# Create tasks for all data in all data types
for i, _type in enumerate(TimeSeriesDataType.objects.all()):
# Delay execution for a few seconds to speed up response
# Offset each call by 2 seconds so they don't bog down the server
get_time_series_data.apply_async(
(fbuser.fitbit_user, _type.category, _type.resource,),
countdown=10 + (i * 5))
subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=1)
# Create a task to retrieve all historical time series data
get_time_series_data.apply_async((fbuser.fitbit_user,), countdown=1)

next_url = request.session.pop('fitbit_next', None) or utils.get_setting(
'FITAPP_LOGIN_REDIRECT')
Expand Down Expand Up @@ -220,17 +215,21 @@ def update(request):
raise Http404

try:
# Create a celery task for each data type in the update
# Create a celery task to get data for all categories in the update
user_categories = {}
for update in updates:
owner_id = update['ownerId']
if owner_id not in user_categories:
user_categories[owner_id] = []
cat = getattr(TimeSeriesDataType, update['collectionType'])
resources = TimeSeriesDataType.objects.filter(category=cat)
for i, _type in enumerate(resources):
# Offset each call by 2 seconds so they don't bog down the
# server
get_time_series_data.apply_async(
(update['ownerId'], _type.category, _type.resource,),
{'date': parser.parse(update['date'])},
countdown=(2 * i))
user_categories[owner_id].append(cat)
for owner_id, categories in user_categories.items():
kwargs = {
'categories': categories,
'date': parser.parse(update['date'])
}
get_time_series_data.apply_async(
(owner_id,), kwargs, countdown=1)
except (KeyError, ValueError, OverflowError):
raise Http404

Expand Down

0 comments on commit 1f2c606

Please sign in to comment.