From 767186395420b5bf9c0d3007a1867f8edc34a360 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 15 Sep 2020 15:40:10 +0200 Subject: [PATCH 1/5] Make Users aware of which gevent group they are in, instead of forcing callers of User.stop() specify it. Add some type hints. --- locust/runners.py | 7 ++++--- locust/test/test_locust_class.py | 6 +++--- locust/user/users.py | 22 +++++++++++----------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index c4ab2f870c..368f8aa270 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -12,6 +12,7 @@ import psutil from gevent.pool import Group +from . import User from .log import greenlet_exception_logger from .rpc import Message, rpc from .stats import RequestStats, setup_distributed_stats_event_listeners @@ -228,15 +229,15 @@ def stop_users(self, user_count, stop_rate=None): stop_group = Group() while True: - user_to_stop = to_stop.pop(random.randint(0, len(to_stop) - 1)) + user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1)) logger.debug("Stopping %s" % user_to_stop._greenlet.name) if self.environment.stop_timeout: - if not user_to_stop.stop(self.user_greenlets, force=False): + if not user_to_stop.stop(force=False): # User.stop() returns False if the greenlet was not stopped, so we'll need # to add it's greenlet to our stopping Group so we can wait for it to finish it's task stop_group.add(user_to_stop._greenlet) else: - user_to_stop.stop(self.user_greenlets, force=True) + user_to_stop.stop(force=True) if to_stop: gevent.sleep(sleep_time) else: diff --git a/locust/test/test_locust_class.py b/locust/test/test_locust_class.py index 2bb79cfe6d..6122cafffe 100644 --- a/locust/test/test_locust_class.py +++ b/locust/test/test_locust_class.py @@ -225,7 +225,7 @@ class MyUser(User): user = MyUser(self.environment) user.start(group) sleep(0.05) - user.stop(group) + user.stop() sleep(0) self.assertTrue(user.t2_executed) @@ -526,7 +526,7 @@ def t(self): self.assertEqual(1, user.test_state) # stop User gracefully - user.stop(group, force=False) + user.stop(force=False) sleep(0) # make sure instance is not killed right away self.assertIn(greenlet, group) @@ -555,7 +555,7 @@ def t(self): self.assertEqual(1, user.test_state) # stop User gracefully - user.stop(group, force=True) + user.stop(force=True) sleep(0) # make sure instance is killed right away, and that the task did NOT get to finish self.assertEqual(0, len(group)) diff --git a/locust/user/users.py b/locust/user/users.py index b85c1642a6..61006ba427 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -1,5 +1,5 @@ from gevent import GreenletExit, greenlet - +from gevent.pool import Group from locust.clients import HttpSession from locust.exception import LocustError, StopUser from locust.util import deprecation @@ -111,7 +111,8 @@ class ForumPage(TaskSet): client = NoClientWarningRaiser() _state = None - _greenlet = None + _greenlet: greenlet.Greenlet = None + _group: Group _taskset_instance = None def __init__(self, environment): @@ -154,11 +155,11 @@ def wait(self): """ self._taskset_instance.wait() - def start(self, gevent_group): + def start(self, group: Group): """ Start a greenlet that runs this User instance. - :param gevent_group: Group instance where the greenlet will be spawned. + :param group: Group instance where the greenlet will be spawned. :type gevent_group: gevent.pool.Group :returns: The spawned greenlet. """ @@ -171,15 +172,14 @@ def run_user(user): """ user.run() - self._greenlet = gevent_group.spawn(run_user, self) + self._greenlet = group.spawn(run_user, self) + self._group = group return self._greenlet - def stop(self, gevent_group, force=False): + def stop(self, force=False): """ - Stop the user greenlet that exists in the gevent_group. + Stop the user greenlet. - :param gevent_group: Group instance where the greenlet will be spawned. - :type gevent_group: gevent.pool.Group :param force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING which will make the User instance stop once any currently running task is complete and on_stop methods are called. If force is True the greenlet will be killed immediately. @@ -187,10 +187,10 @@ def stop(self, gevent_group, force=False): """ if self._greenlet is greenlet.getcurrent(): # the user is stopping itself (from within a task), so blocking would deadlock - gevent_group.killone(self._greenlet, block=False) + self._group.killone(self._greenlet, block=False) return True elif force or self._state == LOCUST_STATE_WAITING: - gevent_group.killone(self._greenlet) + self._group.killone(self._greenlet) return True elif self._state == LOCUST_STATE_RUNNING: self._state = LOCUST_STATE_STOPPING From 2545786412bc6662f81c1e90f533528c00736ee0 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 15 Sep 2020 22:37:12 +0200 Subject: [PATCH 2/5] Refactor: inline _check_stop_condition. It is only ever used in three places, and (because its main use is to throw an exception) it makes things a little hard to understand. --- locust/user/task.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/locust/user/task.py b/locust/user/task.py index 2c79780518..9dfb68a840 100644 --- a/locust/user/task.py +++ b/locust/user/task.py @@ -280,7 +280,8 @@ def run(self): self.schedule_task(self.get_next_task()) try: - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.execute_next_task() except RescheduleTaskImmediately: pass @@ -372,19 +373,17 @@ def wait(self): set a stop_timeout. If this behaviour is not desired you should make the user wait using gevent.sleep() instead. """ - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.user._state = LOCUST_STATE_WAITING self._sleep(self.wait_time()) - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.user._state = LOCUST_STATE_RUNNING def _sleep(self, seconds): gevent.sleep(seconds) - def _check_stop_condition(self): - if self.user._state == LOCUST_STATE_STOPPING: - raise StopUser() - def interrupt(self, reschedule=True): """ Interrupt the TaskSet and hand over execution control back to the parent TaskSet. From 46fe8c40f4d6c0c9decefdf960ec020e14249c42 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 15 Sep 2020 22:41:56 +0200 Subject: [PATCH 3/5] Add failing test case for #1552 Rename a test case. --- locust/test/test_runners.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index a3ce70bdc5..f58c006f44 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -4,6 +4,7 @@ import gevent from gevent import sleep from gevent.queue import Queue +import greenlet import locust from locust import runners, between, constant, LoadTestShape @@ -360,7 +361,7 @@ def test_runner_reference_on_environment(self): self.assertEqual(env, runner.environment) self.assertEqual(runner, env.runner) - def test_users_can_call_runner_quit(self): + def test_users_can_call_runner_quit_without_deadlocking(self): class BaseUser(User): wait_time = constant(0) @@ -379,6 +380,28 @@ def trigger(self): finally: timeout.cancel() + def test_runner_quit_does_not_get_blocked_by_slow_on_stop(self): + class BaseUser(User): + wait_time = constant(0) + + @task + def trigger(self): + pass + + def on_stop(self): + gevent.sleep(0.2) + + runner = Environment(user_classes=[BaseUser]).create_local_runner() + runner.spawn_users(10, 10, wait=False) + timeout = gevent.Timeout(0.4) + timeout.start() + try: + runner.quit() + except gevent.Timeout: + self.fail("Got Timeout exception, runner must have hung somehow.") + finally: + timeout.cancel() + def test_stop_users_with_spawn_rate(self): class MyUser(User): wait_time = constant(1) From f6db44eeaedead3599bcdf38eb6614930ce1ed8e Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 15 Sep 2020 22:57:27 +0200 Subject: [PATCH 4/5] Fix #1552 by using separate greenlets to call .stop(), to allow them to run in parallel. --- locust/runners.py | 24 +++++++++++++++--------- locust/user/users.py | 6 +----- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 368f8aa270..a2980408d6 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -9,6 +9,7 @@ from time import time import gevent +import greenlet import psutil from gevent.pool import Group @@ -18,6 +19,8 @@ from .stats import RequestStats, setup_distributed_stats_event_listeners from .exception import RPCError +from .user.task import LOCUST_STATE_STOPPING + logger = logging.getLogger(__name__) @@ -225,25 +228,28 @@ def stop_users(self, user_count, stop_rate=None): sleep_time = 1.0 / stop_rate logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate)) - if self.environment.stop_timeout: - stop_group = Group() + async_calls_to_stop = Group() + stop_group = Group() while True: user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1)) logger.debug("Stopping %s" % user_to_stop._greenlet.name) - if self.environment.stop_timeout: - if not user_to_stop.stop(force=False): - # User.stop() returns False if the greenlet was not stopped, so we'll need - # to add it's greenlet to our stopping Group so we can wait for it to finish it's task - stop_group.add(user_to_stop._greenlet) + if user_to_stop._greenlet is greenlet.getcurrent(): + # User called runner.quit(), so dont block waiting for killing to finish" + user_to_stop._group.killone(user_to_stop._greenlet, block=False) + elif self.environment.stop_timeout: + async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=False)) + stop_group.add(user_to_stop._greenlet) else: - user_to_stop.stop(force=True) + async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=True)) if to_stop: gevent.sleep(sleep_time) else: break - if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout): + async_calls_to_stop.join() + + if not stop_group.join(timeout=self.environment.stop_timeout): logger.info( "Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout diff --git a/locust/user/users.py b/locust/user/users.py index 61006ba427..356cf40c41 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -185,11 +185,7 @@ def stop(self, force=False): methods are called. If force is True the greenlet will be killed immediately. :returns: True if the greenlet was killed immediately, otherwise False """ - if self._greenlet is greenlet.getcurrent(): - # the user is stopping itself (from within a task), so blocking would deadlock - self._group.killone(self._greenlet, block=False) - return True - elif force or self._state == LOCUST_STATE_WAITING: + if force or self._state == LOCUST_STATE_WAITING: self._group.killone(self._greenlet) return True elif self._state == LOCUST_STATE_RUNNING: From 2c1cdcfb5228b79f7b2c71a1e5ba8a609fe763b0 Mon Sep 17 00:00:00 2001 From: Lars Holmberg Date: Tue, 15 Sep 2020 23:12:57 +0200 Subject: [PATCH 5/5] Assert that every user actually triggered on_stop, even in special cases. --- locust/test/test_runners.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index f58c006f44..b28c43acc4 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -364,11 +364,15 @@ def test_runner_reference_on_environment(self): def test_users_can_call_runner_quit_without_deadlocking(self): class BaseUser(User): wait_time = constant(0) + stop_triggered = False @task def trigger(self): self.environment.runner.quit() + def on_stop(self): + BaseUser.stop_triggered = True + runner = Environment(user_classes=[BaseUser]).create_local_runner() runner.spawn_users(1, 1, wait=False) timeout = gevent.Timeout(0.5) @@ -380,20 +384,24 @@ def trigger(self): finally: timeout.cancel() - def test_runner_quit_does_not_get_blocked_by_slow_on_stop(self): + self.assertTrue(BaseUser.stop_triggered) + + def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self): class BaseUser(User): wait_time = constant(0) + stop_count = 0 @task def trigger(self): pass def on_stop(self): - gevent.sleep(0.2) + gevent.sleep(0.1) + BaseUser.stop_count += 1 runner = Environment(user_classes=[BaseUser]).create_local_runner() runner.spawn_users(10, 10, wait=False) - timeout = gevent.Timeout(0.4) + timeout = gevent.Timeout(0.3) timeout.start() try: runner.quit() @@ -402,6 +410,8 @@ def on_stop(self): finally: timeout.cancel() + self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop + def test_stop_users_with_spawn_rate(self): class MyUser(User): wait_time = constant(1)