Skip to content

Commit

Permalink
Proof of concept: Notify users after returning a 202 Accepted?
Browse files Browse the repository at this point in the history
Task returns "Content-Location" header along with 202
Content-Location points to new /pending end-point
Upon task completion /pending end-point return 303 and "Content-Location" header
Pending Content-Location header returns uri of created/updated resource.
  • Loading branch information
TomBaxter committed Jun 29, 2017
1 parent 425a329 commit 1cead29
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
11 changes: 10 additions & 1 deletion waterbutler/server/api/v1/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from raven.contrib.tornado import SentryMixin

from waterbutler import tasks
from waterbutler.server import utils
from waterbutler.server import utils, settings
from waterbutler.core import exceptions


Expand All @@ -24,7 +24,16 @@ def write_error(self, status_code, exc_info):
finish_args = [exc.data] if exc.data else [{'code': exc.code, 'message': exc.message}]
elif issubclass(etype, tasks.WaitTimeOutError):
self.set_status(202)
pending_url = '{}/pending/{}/{}'.format(settings.DOMAIN, exc.args[1], exc.args[0])
self.add_header('Content-Location', pending_url)
exception_kwargs = {'data': {'level': 'info'}}
finish_args = [{'data': {'attributes': {'status': 'Accepted',
'id': exc.args[0],
'resource': exc.args[1]
},
'links': {'status': pending_url}
}
}]
else:
finish_args = [{'code': status_code, 'message': self._reason}]

Expand Down
6 changes: 4 additions & 2 deletions waterbutler/server/api/v1/provider/movecopy.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,15 @@ async def move_or_copy(self):
if not getattr(self.provider, 'can_intra_' + action)(self.dest_provider, self.path):
# this weird signature syntax courtesy of py3.4 not liking trailing commas on kwargs
conflict = self.json.get('conflict', DEFAULT_CONFLICT)
task_args = self.build_args()
result = await getattr(tasks, action).adelay(
rename=self.json.get('rename'),
conflict=conflict,
request=remote_logging._serialize_request(self.request),
*self.build_args()
*task_args
)
metadata, created = await tasks.wait_on_celery(result)
metadata, created = await tasks.wait_on_celery(result,
result_resource=task_args[1]['nid'])
else:
metadata, created = (
await tasks.backgrounded(
Expand Down
3 changes: 2 additions & 1 deletion waterbutler/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def make_app(debug):
app = tornado.web.Application(
api_to_handlers(v0) +
api_to_handlers(v1) +
[(r'/status', handlers.StatusHandler)],
[(r'/status', handlers.StatusHandler),
(r'/pending/(?P<result_resource>(?:\w|\d)+)/(?P<task_id>(?:\w|\d|-)+)', handlers.PendingHandler)],
debug=debug,
)
app.sentry_client = AsyncSentryClient(settings.SENTRY_DSN, release=waterbutler.__version__)
Expand Down
40 changes: 40 additions & 0 deletions waterbutler/server/handlers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import tornado.web
from celery.result import AsyncResult
from celery import Celery

import waterbutler
from waterbutler.tasks import settings as tasks_settings

app = Celery()
app.config_from_object(tasks_settings)


class StatusHandler(tornado.web.RequestHandler):
Expand All @@ -11,3 +17,37 @@ def get(self):
'status': 'up',
'version': waterbutler.__version__
})


class PendingHandler(tornado.web.RequestHandler):

def get(self, task_id, result_resource):
# TKB need to cover all possible states
app = Celery()
app.config_from_object(tasks_settings)
result = AsyncResult(id=task_id, app=app)
if str(result.ready()) == 'True':
if str(result.state) == 'SUCCESS':
meta, created = result.get(timeout=3)
self.set_status(303)
self.add_header('Content-Location', '{}://{}/v1/resources/{}/providers/{}{}'.format(self.request.protocol, self.request.host, result_resource, meta.provider, meta.path))
self.write({'data':
{'task_id': task_id,
'state': str(result.state),
'ready': str(result.ready()),
}
})
elif str(result.state) == 'FAILURE':
self.set_status(200)
self.write({'errors':
{'status': result.error_code,
'source': {'pointer': self.url},
'title': result.error_msg,
'detail': result.error_dtl
}
})
else:
self.write({
'task_id': task_id,
'state': str(result.state),
})
4 changes: 2 additions & 2 deletions waterbutler/tasks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def celery_task(func, *args, **kwargs):


@backgroundify
async def wait_on_celery(result, interval=None, timeout=None, basepath=None):
async def wait_on_celery(result, result_resource=None, interval=None, timeout=None, basepath=None):
timeout = timeout or settings.WAIT_TIMEOUT
interval = interval or settings.WAIT_INTERVAL
basepath = basepath or settings.ADHOC_BACKEND_PATH
Expand All @@ -123,6 +123,6 @@ async def wait_on_celery(result, interval=None, timeout=None, basepath=None):
return result.result

if waited > timeout:
raise exceptions.WaitTimeOutError
raise exceptions.WaitTimeOutError(str(result.id), str(result_resource))
await asyncio.sleep(interval)
waited += interval
18 changes: 11 additions & 7 deletions waterbutler/tasks/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@
os.environ.get('RABBITMQ_PORT_5672_TCP_PORT', ''),
)
)
CELERY_RESULT_BACKEND = config.get(
'CELERY_RESULT_BACKEND',
'{}://{}:{}/{}'.format(
os.environ.get('CELERY_RESULT_BACKEND_PROTO', ''),
os.environ.get('CELERY_RESULT_BACKEND_TCP_ADDR', ''),
os.environ.get('CELERY_RESULT_BACKEND_TCP_PORT', ''),
os.environ.get('CELERY_RESULT_BACKEND_ID', ''),
)
)

WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 15))
# For testing 202 response
# WAIT_TIMEOUT = int(config.get('WAIT_TIMEOUT', 1))
WAIT_INTERVAL = float(config.get('WAIT_INTERVAL', 0.5))
ADHOC_BACKEND_PATH = config.get('ADHOC_BACKEND_PATH', '/tmp')

Expand All @@ -25,12 +36,6 @@
CELERY_QUEUES = (
Queue('waterbutler', Exchange('waterbutler'), routing_key='waterbutler'),
)
# CELERY_ALWAYS_EAGER = config.get('CELERY_ALWAYS_EAGER', True)
CELERY_ALWAYS_EAGER = config.get_bool('CELERY_ALWAYS_EAGER', False)
# CELERY_RESULT_BACKEND = config.get('CELERY_RESULT_BACKEND', 'redis://')
CELERY_RESULT_BACKEND = config.get_nullable('CELERY_RESULT_BACKEND', None)
CELERY_DISABLE_RATE_LIMITS = config.get_bool('CELERY_DISABLE_RATE_LIMITS', True)
CELERY_TASK_RESULT_EXPIRES = int(config.get('CELERY_TASK_RESULT_EXPIRES', 60))
CELERY_IMPORTS = [
entry.module_name
for entry in iter_entry_points(group='waterbutler.providers.tasks', name=None)
Expand All @@ -41,4 +46,3 @@

CELERY_ACKS_LATE = True
CELERYD_HIJACK_ROOT_LOGGER = False
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True

0 comments on commit 1cead29

Please sign in to comment.