Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Policy.on_exception actually used to make consumer go inactive. #4472

Merged
merged 4 commits into from
Nov 29, 2017

Conversation

dhermes
Copy link
Contributor

@dhermes dhermes commented Nov 28, 2017

Towards #4463.

@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Nov 28, 2017
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Nov 28, 2017
bool: ``True`` if this subscription is opened with this future,
``False`` otherwise.
bool: :data:`True` if this subscription is opened with this
future, :data:`False` otherwise.

This comment was marked as spam.

This comment was marked as spam.

policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(consumer._exiting)

This comment was marked as spam.

@@ -244,10 +244,9 @@ def _blocking_consume(self):
self.stop_consuming()
except Exception as exc:
try:
self._policy.on_exception(exc)
self.active = self._policy.on_exception(exc)
except:
self.active = False

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -244,10 +244,9 @@ def _blocking_consume(self):
self.stop_consuming()
except Exception as exc:
try:
self._policy.on_exception(exc)
self.active = self._policy.on_exception(exc)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@dhermes
Copy link
Contributor Author

dhermes commented Nov 28, 2017

OK I have updated the gist to be much clearer as to how things were behaving and how they behave now. Most notable:

  • Heartbeats where running=True, done=True in the logs_pypi.txt (happens immediately after thread.Policy.on_exception is called)
  • Lease management (e.g. "Snoozing lease management...") ceases in logs_local.txt immediately after the heartbeats change to running=False, done=True. Just to be sure, we have 40 seconds worth of heartbeats before stopping the main thread and there is still no lease management happening

This keeps the thread active after one failure but
stops after a second.
@dhermes
Copy link
Contributor Author

dhermes commented Nov 29, 2017

Remaining point of discussion (got buried below the fold):

... AFAICT thread.Policy.close() (which gets called by BasePolicy.lease()) is what cleans up (it's the only thing that can stop Consumer._blocking_consume since Consumer.stop_consuming is the only thing that calls self._exiting.set()).

@chemelnucfin chemelnucfin changed the title Policy.on_exception actually used to make consumer go inactive. PubSub: Policy.on_exception actually used to make consumer go inactive. Nov 29, 2017
@dhermes
Copy link
Contributor Author

dhermes commented Nov 29, 2017

OK my previous remark seems to be wrong, instead it seems that BasePolicy.maintain_leases() is where the action is.

@dhermes
Copy link
Contributor Author

dhermes commented Nov 29, 2017

I am going to merge this since it "moves the bar" towards correctness even though I know it isn't 100% correct. E.g. I can still trigger "set_exception can only be called once." by using a subscription that doesn't exist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants