-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make a decorator for lambda functions #191
Conversation
Fixes #192 |
@@ -332,9 +339,28 @@ def init(access_token, environment='production', **kw): | |||
**SETTINGS['locals']['sizes']) | |||
_transforms.append(shortener) | |||
|
|||
_threads = queue.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue is thread safe
@@ -415,6 +441,7 @@ def send_payload(payload, access_token): | |||
else: | |||
# default to 'thread' | |||
thread = threading.Thread(target=_send_payload, args=(payload, access_token)) | |||
_threads.put(thread) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the thread to the queue when it is created, before it starts running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I think this is actually going to be a great addition to the non-lambda use cases. Is there an easy way of using a thread pool here instead of creating a new thread for each call?
@@ -445,6 +472,12 @@ def search_items(title, return_fields=None, access_token=None, endpoint=None, ** | |||
**search_fields) | |||
|
|||
|
|||
def wait(f=None): | |||
_threads.join() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This join will return when all task_done
is called on the queue the same number of times that put has been called. It will block the calling thread, which cannot be any of the threads in the queue that need to call task_done because we control all those threads and they only execute _send_payload
If we are not using threads, e.g. using the blocking handler, then _threads will be empty and join will return right away.
def wait(f=None): | ||
_threads.join() | ||
if f is not None: | ||
return f() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows wait to return a value which we use to return the value at the end of the lambda function
@@ -1133,6 +1166,11 @@ def _send_payload(payload, access_token): | |||
_post_api('item/', payload, access_token=access_token) | |||
except Exception as e: | |||
log.exception('Exception while posting item %r', e) | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using threads this will be called on one of the threads created for sending a payload. get_nowait
will remove a thread from the queue, it doesn't really matter which one it removes as long as each thread calls this exactly once because we don't do anything with the returned value. Again, since we are on one of the threads, calling task_done here can signal to join to return.
If we are not using threads, i.e. any other handler, then _threads will just always be empty and get_nowait will throw queue.Empty and we just move on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand the need for removing one of the threads here. What would happen if this code were not here?
@@ -415,6 +441,7 @@ def send_payload(payload, access_token): | |||
else: | |||
# default to 'thread' | |||
thread = threading.Thread(target=_send_payload, args=(payload, access_token)) | |||
_threads.put(thread) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I think this is actually going to be a great addition to the non-lambda use cases. Is there an easy way of using a thread pool here instead of creating a new thread for each call?
@@ -1133,6 +1166,11 @@ def _send_payload(payload, access_token): | |||
_post_api('item/', payload, access_token=access_token) | |||
except Exception as e: | |||
log.exception('Exception while posting item %r', e) | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't understand the need for removing one of the threads here. What would happen if this code were not here?
@functools.wraps(f) | ||
def wrapper(event, context): | ||
global _CURRENT_LAMBDA_CONTEXT | ||
_CURRENT_LAMBDA_CONTEXT = context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If lambda can run Python threads then this global is going to need to be a thread local.
This implements a decorator
lambda_function
to be used likeIf we comment out the
raise Exception
line then everything will just run normally and the rollbar code will pass things through as necessary. If we do have an exception then Rollbar will handle sending it to the API and waiting for any processing threads as necessary. If the handler happened to be set to another value like 'blocking' then the wait is superfluous but otherwise the same outcome will be observed.