Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce number of parallel data retrieving tasks #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions fitapp/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,22 @@ def unsubscribe(*args, **kwargs):
raise Reject(exc, requeue=False)



@shared_task
def get_time_series_data(fitbit_user, cat, resource, date=None):
def get_time_series_data(fitbit_user, categories=[], date=None):
""" Get the user's time series data """

try:
_type = TimeSeriesDataType.objects.get(category=cat, resource=resource)
except TimeSeriesDataType.DoesNotExist:
logger.exception("The resource %s in category %s doesn't exist" % (
resource, cat))
filters = {'category__in': categories} if categories else {}
types = TimeSeriesDataType.objects.filter(**filters)
if not types.exists():
logger.exception("Couldn't find the time series data types")
raise Reject(sys.exc_info()[1], requeue=False)

# Create a lock so we don't try to run the same task multiple times
sdat = date.strftime('%Y-%m-%d') if date else 'ALL'
lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, _type, sdat)
cats = '-'.join('%s' % i for i in categories)
lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, cats, sdat)
if not cache.add(lock_id, 'true', LOCK_EXPIRE):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brad I don't think we can use the Django cache for the lock and guarantee that it will work with the various setups that people are likely to have. For example, Django's default caching method is local memory caching, which is a per-process cache. Depending on celery setup, this code can be executed by more than one process which would each have their own cache and not be able to see the locks created by the other workers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grokcode What can I do then? Would it be safe to get rid of this lock and decorate get_fitbit_data with @transaction.atomic()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brad I think the easiest solution is to punt on it for now and make a note in the README here that the fitbit tasks shouldn't be run concurrently, and then give an example of a way to set up celery to do that. I think we can use celery's manual routing feature to create a new queue and then when starting celery, make sure there is only one thread working on that queue.

It would be much nicer to support concurrent tasks (but trickier too). I think we can do the locking with the db. One idea is to store the lock in the DB, use the @transaction.atomic() decorator like you said, and Django's select_for_update to acquire the lock. I think it would be enough to have one lock per user so that only tasks for one user can execute at a time. That way we shouldn't have multiple processes trying to renew the token at the same time and stepping on each other.

logger.debug('Already retrieving %s data for date %s, user %s' % (
_type, fitbit_user, sdat))
logger.debug('Already working on %s' % lock_id)
raise Ignore()

fbusers = UserFitbit.objects.filter(fitbit_user=fitbit_user)
Expand All @@ -72,16 +70,15 @@ def get_time_series_data(fitbit_user, cat, resource, date=None):
dates = {'base_date': date, 'end_date': date}
try:
for fbuser in fbusers:
data = utils.get_fitbit_data(fbuser, _type, **dates)
for datum in data:
# Create new record or update existing record
date = parser.parse(datum['dateTime'])
tsd, created = TimeSeriesData.objects.get_or_create(
user=fbuser.user, resource_type=_type, date=date)
tsd.value = datum['value']
tsd.save()
# Release the lock
cache.delete(lock_id)
for _type in types:
data = utils.get_fitbit_data(fbuser, _type, **dates)
for datum in data:
# Create new record or update existing record
date = parser.parse(datum['dateTime'])
tsd, created = TimeSeriesData.objects.get_or_create(
user=fbuser.user, resource_type=_type, date=date)
tsd.value = datum['value']
tsd.save()
except HTTPTooManyRequests:
# We have hit the rate limit for the user, retry when it's reset,
# according to the reply from the failing API call
Expand All @@ -93,11 +90,14 @@ def get_time_series_data(fitbit_user, cat, resource, date=None):
# If the resource is elevation or floors, we are just getting this
# error because the data doesn't exist for this user, so we can ignore
# the error
if not ('elevation' in resource or 'floors' in resource):
if not ('elevation' in _type.resource or 'floors' in _type.resource):
exc = sys.exc_info()[1]
logger.exception("Exception updating data: %s" % exc)
raise Reject(exc, requeue=False)
except Exception:
exc = sys.exc_info()[1]
logger.exception("Exception updating data: %s" % exc)
raise Reject(exc, requeue=False)
finally:
# Release the lock
cache.delete(lock_id)
22 changes: 9 additions & 13 deletions fitapp/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,9 @@ def test_complete(self, tsd_apply_async, sub_apply_async):
response, utils.get_setting('FITAPP_LOGIN_REDIRECT'))
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_once_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
tsdts = TimeSeriesDataType.objects.all()
self.assertEqual(tsd_apply_async.call_count, tsdts.count())
for i, _type in enumerate(tsdts):
tsd_apply_async.assert_any_call(
(fbuser.fitbit_user, _type.category, _type.resource,),
countdown=10 + (i * 5))
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down Expand Up @@ -204,9 +200,9 @@ def test_next(self, tsd_apply_async, sub_apply_async):
self.assertRedirectsNoFollow(response, '/test')
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_once_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
self.assertEqual(
tsd_apply_async.call_count, TimeSeriesDataType.objects.count())
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down Expand Up @@ -251,9 +247,9 @@ def test_integrated(self, tsd_apply_async, sub_apply_async):
client_kwargs=self.token, get_kwargs={'code': self.code})
fbuser = UserFitbit.objects.get()
sub_apply_async.assert_called_with(
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5)
self.assertEqual(tsd_apply_async.call_count,
TimeSeriesDataType.objects.count())
(fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1)
tsd_apply_async.assert_called_once_with(
(fbuser.fitbit_user,), countdown=1)
self.assertEqual(fbuser.user, self.user)
self.assertEqual(fbuser.access_token, self.token['access_token'])
self.assertEqual(fbuser.refresh_token, self.token['refresh_token'])
Expand Down
51 changes: 23 additions & 28 deletions fitapp/tests/test_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,13 @@ def test_subscription_update(self, get_fitbit_data):
# from Fitbit.
get_fitbit_data.return_value = [{'value': self.value}]
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self._receive_fitbit_updates()
self.assertEqual(get_fitbit_data.call_count, resources.count())
# Check that the cache locks have been deleted
for resource in resources:
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
category, resource.resource, self.date)
), None)
self.assertEqual(get_fitbit_data.call_count, 1)
# Check that the cache lock has been deleted
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
self.fbuser.fitbit_user, category, self.date)
), None)
date = parser.parse(self.date)
for tsd in TimeSeriesData.objects.filter(user=self.user, date=date):
assert tsd.value, self.value
Expand All @@ -153,15 +151,13 @@ def test_subscription_update_file(self, get_fitbit_data):
# is received from Fitbit.
get_fitbit_data.return_value = [{'value': self.value}]
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self._receive_fitbit_updates(file=True)
self.assertEqual(get_fitbit_data.call_count, resources.count())
# Check that the cache locks have been deleted
for resource in resources:
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
category, resource.resource, self.date)
), None)
self.assertEqual(get_fitbit_data.call_count, 1)
# Check that the cache lock has been deleted
self.assertEqual(
cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % (
self.fbuser.fitbit_user, category, self.date)
), None)
date = parser.parse(self.date)
for tsd in TimeSeriesData.objects.filter(user=self.user, date=date):
assert tsd.value, self.value
Expand All @@ -181,15 +177,15 @@ def test_subscription_update_locked(self, mock_add, get_fitbit_data):
@patch('fitapp.utils.get_fitbit_data')
def test_subscription_update_too_many(self, get_fitbit_data):
# Check that celery tasks get postponed if the rate limit is hit
cat_id = getattr(TimeSeriesDataType, self.category)
_type = TimeSeriesDataType.objects.filter(category=cat_id)[0]
lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format(
self.fbuser.fitbit_user, _type, self.date)
exc = fitbit_exceptions.HTTPTooManyRequests(self._error_response())
exc.retry_after_secs = 21
category = getattr(TimeSeriesDataType, self.category)

def side_effect(*args, **kwargs):
# Delete the cache lock after the first try and adjust the
# get_fitbit_data mock to be successful
lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format(
self.fbuser.fitbit_user, category, self.date)
cache.delete(lock_id)
get_fitbit_data.side_effect = None
get_fitbit_data.return_value = [{
Expand All @@ -198,18 +194,17 @@ def side_effect(*args, **kwargs):
}]
raise exc
get_fitbit_data.side_effect = side_effect
category = getattr(TimeSeriesDataType, self.category)
resources = TimeSeriesDataType.objects.filter(category=category)
self.assertEqual(TimeSeriesData.objects.count(), 0)
result = get_time_series_data.apply_async(
(self.fbuser.fitbit_user, _type.category, _type.resource,),
{'date': parser.parse(self.date)})
(self.fbuser.fitbit_user,),
{'categories': [category], 'date': parser.parse(self.date)})
result.get()
# Since celery is in eager mode, we expect a Retry exception first
# and then a second task execution that is successful
self.assertEqual(get_fitbit_data.call_count, 2)
self.assertEqual(TimeSeriesData.objects.count(), 1)
self.assertEqual(TimeSeriesData.objects.get().value, '34')
# and then task executions for each resource in the category to be
# successful
self.assertEqual(get_fitbit_data.call_count, 24)
self.assertEqual(TimeSeriesData.objects.count(), 23)
self.assertEqual(TimeSeriesData.objects.all()[0].value, '34')

def test_problem_queueing_task(self):
get_time_series_data = MagicMock()
Expand Down
33 changes: 16 additions & 17 deletions fitapp/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,9 @@ def complete(request):
SUBSCRIBER_ID = utils.get_setting('FITAPP_SUBSCRIBER_ID')
except ImproperlyConfigured:
return redirect(reverse('fitbit-error'))
subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=5)
# Create tasks for all data in all data types
for i, _type in enumerate(TimeSeriesDataType.objects.all()):
# Delay execution for a few seconds to speed up response
# Offset each call by 2 seconds so they don't bog down the server
get_time_series_data.apply_async(
(fbuser.fitbit_user, _type.category, _type.resource,),
countdown=10 + (i * 5))
subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=1)
# Create a task to retrieve all historical time series data
get_time_series_data.apply_async((fbuser.fitbit_user,), countdown=1)

next_url = request.session.pop('fitbit_next', None) or utils.get_setting(
'FITAPP_LOGIN_REDIRECT')
Expand Down Expand Up @@ -220,17 +215,21 @@ def update(request):
raise Http404

try:
# Create a celery task for each data type in the update
# Create a celery task to get data for all categories in the update
user_categories = {}
for update in updates:
owner_id = update['ownerId']
if owner_id not in user_categories:
user_categories[owner_id] = []
cat = getattr(TimeSeriesDataType, update['collectionType'])
resources = TimeSeriesDataType.objects.filter(category=cat)
for i, _type in enumerate(resources):
# Offset each call by 2 seconds so they don't bog down the
# server
get_time_series_data.apply_async(
(update['ownerId'], _type.category, _type.resource,),
{'date': parser.parse(update['date'])},
countdown=(2 * i))
user_categories[owner_id].append(cat)
for owner_id, categories in user_categories.items():
kwargs = {
'categories': categories,
'date': parser.parse(update['date'])
}
get_time_series_data.apply_async(
(owner_id,), kwargs, countdown=1)
except (KeyError, ValueError, OverflowError):
raise Http404

Expand Down