From 1f2c6069ee41555f11e8fc526c549087be7bbcd2 Mon Sep 17 00:00:00 2001 From: Brad Pitcher Date: Mon, 25 Apr 2016 08:28:37 -0700 Subject: [PATCH] reduce number of parallel data retrieving tasks --- fitapp/tasks.py | 42 +++++++++++++------------- fitapp/tests/test_integration.py | 22 ++++++-------- fitapp/tests/test_retrieval.py | 51 ++++++++++++++------------------ fitapp/views.py | 33 ++++++++++----------- 4 files changed, 69 insertions(+), 79 deletions(-) diff --git a/fitapp/tasks.py b/fitapp/tasks.py index 46ad9d3..3111045 100644 --- a/fitapp/tasks.py +++ b/fitapp/tasks.py @@ -46,24 +46,22 @@ def unsubscribe(*args, **kwargs): raise Reject(exc, requeue=False) - @shared_task -def get_time_series_data(fitbit_user, cat, resource, date=None): +def get_time_series_data(fitbit_user, categories=[], date=None): """ Get the user's time series data """ - try: - _type = TimeSeriesDataType.objects.get(category=cat, resource=resource) - except TimeSeriesDataType.DoesNotExist: - logger.exception("The resource %s in category %s doesn't exist" % ( - resource, cat)) + filters = {'category__in': categories} if categories else {} + types = TimeSeriesDataType.objects.filter(**filters) + if not types.exists(): + logger.exception("Couldn't find the time series data types") raise Reject(sys.exc_info()[1], requeue=False) # Create a lock so we don't try to run the same task multiple times sdat = date.strftime('%Y-%m-%d') if date else 'ALL' - lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, _type, sdat) + cats = '-'.join('%s' % i for i in categories) + lock_id = '{0}-lock-{1}-{2}-{3}'.format(__name__, fitbit_user, cats, sdat) if not cache.add(lock_id, 'true', LOCK_EXPIRE): - logger.debug('Already retrieving %s data for date %s, user %s' % ( - _type, fitbit_user, sdat)) + logger.debug('Already working on %s' % lock_id) raise Ignore() fbusers = UserFitbit.objects.filter(fitbit_user=fitbit_user) @@ -72,16 +70,15 @@ def get_time_series_data(fitbit_user, cat, resource, date=None): dates = {'base_date': date, 'end_date': date} try: for fbuser in fbusers: - data = utils.get_fitbit_data(fbuser, _type, **dates) - for datum in data: - # Create new record or update existing record - date = parser.parse(datum['dateTime']) - tsd, created = TimeSeriesData.objects.get_or_create( - user=fbuser.user, resource_type=_type, date=date) - tsd.value = datum['value'] - tsd.save() - # Release the lock - cache.delete(lock_id) + for _type in types: + data = utils.get_fitbit_data(fbuser, _type, **dates) + for datum in data: + # Create new record or update existing record + date = parser.parse(datum['dateTime']) + tsd, created = TimeSeriesData.objects.get_or_create( + user=fbuser.user, resource_type=_type, date=date) + tsd.value = datum['value'] + tsd.save() except HTTPTooManyRequests: # We have hit the rate limit for the user, retry when it's reset, # according to the reply from the failing API call @@ -93,7 +90,7 @@ def get_time_series_data(fitbit_user, cat, resource, date=None): # If the resource is elevation or floors, we are just getting this # error because the data doesn't exist for this user, so we can ignore # the error - if not ('elevation' in resource or 'floors' in resource): + if not ('elevation' in _type.resource or 'floors' in _type.resource): exc = sys.exc_info()[1] logger.exception("Exception updating data: %s" % exc) raise Reject(exc, requeue=False) @@ -101,3 +98,6 @@ def get_time_series_data(fitbit_user, cat, resource, date=None): exc = sys.exc_info()[1] logger.exception("Exception updating data: %s" % exc) raise Reject(exc, requeue=False) + finally: + # Release the lock + cache.delete(lock_id) diff --git a/fitapp/tests/test_integration.py b/fitapp/tests/test_integration.py index e88e7c7..ae1a7d3 100644 --- a/fitapp/tests/test_integration.py +++ b/fitapp/tests/test_integration.py @@ -154,13 +154,9 @@ def test_complete(self, tsd_apply_async, sub_apply_async): response, utils.get_setting('FITAPP_LOGIN_REDIRECT')) fbuser = UserFitbit.objects.get() sub_apply_async.assert_called_once_with( - (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5) - tsdts = TimeSeriesDataType.objects.all() - self.assertEqual(tsd_apply_async.call_count, tsdts.count()) - for i, _type in enumerate(tsdts): - tsd_apply_async.assert_any_call( - (fbuser.fitbit_user, _type.category, _type.resource,), - countdown=10 + (i * 5)) + (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1) + tsd_apply_async.assert_called_once_with( + (fbuser.fitbit_user,), countdown=1) self.assertEqual(fbuser.user, self.user) self.assertEqual(fbuser.access_token, self.token['access_token']) self.assertEqual(fbuser.refresh_token, self.token['refresh_token']) @@ -204,9 +200,9 @@ def test_next(self, tsd_apply_async, sub_apply_async): self.assertRedirectsNoFollow(response, '/test') fbuser = UserFitbit.objects.get() sub_apply_async.assert_called_once_with( - (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5) - self.assertEqual( - tsd_apply_async.call_count, TimeSeriesDataType.objects.count()) + (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1) + tsd_apply_async.assert_called_once_with( + (fbuser.fitbit_user,), countdown=1) self.assertEqual(fbuser.user, self.user) self.assertEqual(fbuser.access_token, self.token['access_token']) self.assertEqual(fbuser.refresh_token, self.token['refresh_token']) @@ -251,9 +247,9 @@ def test_integrated(self, tsd_apply_async, sub_apply_async): client_kwargs=self.token, get_kwargs={'code': self.code}) fbuser = UserFitbit.objects.get() sub_apply_async.assert_called_with( - (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=5) - self.assertEqual(tsd_apply_async.call_count, - TimeSeriesDataType.objects.count()) + (fbuser.fitbit_user, settings.FITAPP_SUBSCRIBER_ID), countdown=1) + tsd_apply_async.assert_called_once_with( + (fbuser.fitbit_user,), countdown=1) self.assertEqual(fbuser.user, self.user) self.assertEqual(fbuser.access_token, self.token['access_token']) self.assertEqual(fbuser.refresh_token, self.token['refresh_token']) diff --git a/fitapp/tests/test_retrieval.py b/fitapp/tests/test_retrieval.py index 2cc0a20..ead66c2 100644 --- a/fitapp/tests/test_retrieval.py +++ b/fitapp/tests/test_retrieval.py @@ -134,15 +134,13 @@ def test_subscription_update(self, get_fitbit_data): # from Fitbit. get_fitbit_data.return_value = [{'value': self.value}] category = getattr(TimeSeriesDataType, self.category) - resources = TimeSeriesDataType.objects.filter(category=category) self._receive_fitbit_updates() - self.assertEqual(get_fitbit_data.call_count, resources.count()) - # Check that the cache locks have been deleted - for resource in resources: - self.assertEqual( - cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % ( - category, resource.resource, self.date) - ), None) + self.assertEqual(get_fitbit_data.call_count, 1) + # Check that the cache lock has been deleted + self.assertEqual( + cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % ( + self.fbuser.fitbit_user, category, self.date) + ), None) date = parser.parse(self.date) for tsd in TimeSeriesData.objects.filter(user=self.user, date=date): assert tsd.value, self.value @@ -153,15 +151,13 @@ def test_subscription_update_file(self, get_fitbit_data): # is received from Fitbit. get_fitbit_data.return_value = [{'value': self.value}] category = getattr(TimeSeriesDataType, self.category) - resources = TimeSeriesDataType.objects.filter(category=category) self._receive_fitbit_updates(file=True) - self.assertEqual(get_fitbit_data.call_count, resources.count()) - # Check that the cache locks have been deleted - for resource in resources: - self.assertEqual( - cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % ( - category, resource.resource, self.date) - ), None) + self.assertEqual(get_fitbit_data.call_count, 1) + # Check that the cache lock has been deleted + self.assertEqual( + cache.get('fitapp.get_time_series_data-lock-%s-%s-%s' % ( + self.fbuser.fitbit_user, category, self.date) + ), None) date = parser.parse(self.date) for tsd in TimeSeriesData.objects.filter(user=self.user, date=date): assert tsd.value, self.value @@ -181,15 +177,15 @@ def test_subscription_update_locked(self, mock_add, get_fitbit_data): @patch('fitapp.utils.get_fitbit_data') def test_subscription_update_too_many(self, get_fitbit_data): # Check that celery tasks get postponed if the rate limit is hit - cat_id = getattr(TimeSeriesDataType, self.category) - _type = TimeSeriesDataType.objects.filter(category=cat_id)[0] - lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format( - self.fbuser.fitbit_user, _type, self.date) exc = fitbit_exceptions.HTTPTooManyRequests(self._error_response()) exc.retry_after_secs = 21 + category = getattr(TimeSeriesDataType, self.category) + def side_effect(*args, **kwargs): # Delete the cache lock after the first try and adjust the # get_fitbit_data mock to be successful + lock_id = 'fitapp.tasks-lock-{0}-{1}-{2}'.format( + self.fbuser.fitbit_user, category, self.date) cache.delete(lock_id) get_fitbit_data.side_effect = None get_fitbit_data.return_value = [{ @@ -198,18 +194,17 @@ def side_effect(*args, **kwargs): }] raise exc get_fitbit_data.side_effect = side_effect - category = getattr(TimeSeriesDataType, self.category) - resources = TimeSeriesDataType.objects.filter(category=category) self.assertEqual(TimeSeriesData.objects.count(), 0) result = get_time_series_data.apply_async( - (self.fbuser.fitbit_user, _type.category, _type.resource,), - {'date': parser.parse(self.date)}) + (self.fbuser.fitbit_user,), + {'categories': [category], 'date': parser.parse(self.date)}) result.get() # Since celery is in eager mode, we expect a Retry exception first - # and then a second task execution that is successful - self.assertEqual(get_fitbit_data.call_count, 2) - self.assertEqual(TimeSeriesData.objects.count(), 1) - self.assertEqual(TimeSeriesData.objects.get().value, '34') + # and then task executions for each resource in the category to be + # successful + self.assertEqual(get_fitbit_data.call_count, 24) + self.assertEqual(TimeSeriesData.objects.count(), 23) + self.assertEqual(TimeSeriesData.objects.all()[0].value, '34') def test_problem_queueing_task(self): get_time_series_data = MagicMock() diff --git a/fitapp/views.py b/fitapp/views.py index f3873aa..535bc1f 100644 --- a/fitapp/views.py +++ b/fitapp/views.py @@ -102,14 +102,9 @@ def complete(request): SUBSCRIBER_ID = utils.get_setting('FITAPP_SUBSCRIBER_ID') except ImproperlyConfigured: return redirect(reverse('fitbit-error')) - subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=5) - # Create tasks for all data in all data types - for i, _type in enumerate(TimeSeriesDataType.objects.all()): - # Delay execution for a few seconds to speed up response - # Offset each call by 2 seconds so they don't bog down the server - get_time_series_data.apply_async( - (fbuser.fitbit_user, _type.category, _type.resource,), - countdown=10 + (i * 5)) + subscribe.apply_async((fbuser.fitbit_user, SUBSCRIBER_ID), countdown=1) + # Create a task to retrieve all historical time series data + get_time_series_data.apply_async((fbuser.fitbit_user,), countdown=1) next_url = request.session.pop('fitbit_next', None) or utils.get_setting( 'FITAPP_LOGIN_REDIRECT') @@ -220,17 +215,21 @@ def update(request): raise Http404 try: - # Create a celery task for each data type in the update + # Create a celery task to get data for all categories in the update + user_categories = {} for update in updates: + owner_id = update['ownerId'] + if owner_id not in user_categories: + user_categories[owner_id] = [] cat = getattr(TimeSeriesDataType, update['collectionType']) - resources = TimeSeriesDataType.objects.filter(category=cat) - for i, _type in enumerate(resources): - # Offset each call by 2 seconds so they don't bog down the - # server - get_time_series_data.apply_async( - (update['ownerId'], _type.category, _type.resource,), - {'date': parser.parse(update['date'])}, - countdown=(2 * i)) + user_categories[owner_id].append(cat) + for owner_id, categories in user_categories.items(): + kwargs = { + 'categories': categories, + 'date': parser.parse(update['date']) + } + get_time_series_data.apply_async( + (owner_id,), kwargs, countdown=1) except (KeyError, ValueError, OverflowError): raise Http404