From 88b33207e0e4cde852815e059257f2c02a89348f Mon Sep 17 00:00:00 2001 From: Max Williams Date: Mon, 3 Aug 2020 17:10:51 +0200 Subject: [PATCH 1/6] Allow ramping down of users --- locust/runners.py | 65 +++++++++++++++++++++++-------------- locust/test/test_runners.py | 31 +++++++++++++++++- 2 files changed, 70 insertions(+), 26 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 89dd8b00ba..3394bf151a 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -168,19 +168,21 @@ def hatch(): logger.debug("%i users hatched" % len(self.user_greenlets)) if bucket: gevent.sleep(sleep_time) - + hatch() if wait: self.user_greenlets.join() logger.info("All users stopped\n") - def stop_users(self, user_count): + def stop_users(self, user_count, stop_rate=None): """ - Stop a stop_count of weighted users from the Group() object in self.users + Stop `user_count` weighted users at a rate of `stop_rate` """ + if user_count == 0 or stop_rate == 0: + return + bucket = self.weight_users(user_count) user_count = len(bucket) - logger.info("Stopping %i users" % user_count) to_stop = [] for g in self.user_greenlets: for l in bucket: @@ -189,25 +191,38 @@ def stop_users(self, user_count): to_stop.append(user) bucket.remove(l) break - self.stop_user_instances(to_stop) - self.environment.events.hatch_complete.fire(user_count=self.user_count) - - - def stop_user_instances(self, users): - if self.environment.stop_timeout: - stopping = Group() - for user in users: - if not user.stop(self.user_greenlets, force=False): + + if not to_stop: + return + + if stop_rate == None or user_count == stop_rate: + sleep_time = 0 + logger.info("Stopping %i users immediately" % (user_count)) + else: + sleep_time = 1.0 / stop_rate + logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate)) + + while True: + user_to_stop = to_stop.pop(random.randint(0, len(to_stop)-1)) + logger.debug('Stopping %s' % user_to_stop._greenlet.name) + if self.environment.stop_timeout: + stop_group = Group() + if not user_to_stop.stop(self.user_greenlets, force=False): # User.stop() returns False if the greenlet was not stopped, so we'll need # to add it's greenlet to our stopping Group so we can wait for it to finish it's task - stopping.add(user._greenlet) - if not stopping.join(timeout=self.environment.stop_timeout): - logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout) - stopping.kill(block=True) - else: - for user in users: - user.stop(self.user_greenlets, force=True) - + stop_group.add(user_to_stop._greenlet) + if not stop_group.join(timeout=self.environment.stop_timeout): + logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout) + stop_group.kill(block=True) + else: + user_to_stop.stop(self.user_greenlets, force=True) + if to_stop: + gevent.sleep(sleep_time) + else: + logger.info("%i Users have been stopped" % user_count) + break + + def monitor_cpu(self): process = psutil.Process() while True: @@ -234,13 +249,13 @@ def start(self, user_count, hatch_rate, wait=False): self.worker_cpu_warning_emitted = False self.target_user_count = user_count - # Dynamically changing the user count if self.state != STATE_INIT and self.state != STATE_STOPPED: + logger.debug("Updating running test with %d users, %.2f hatch rate and wait=%r" % (user_count, hatch_rate, wait)) self.state = STATE_HATCHING if self.user_count > user_count: # Stop some users stop_count = self.user_count - user_count - self.stop_users(stop_count) + self.stop_users(stop_count, hatch_rate) elif self.user_count < user_count: # Spawn some users spawn_count = user_count - self.user_count @@ -284,10 +299,10 @@ def stop(self): # if we are currently hatching users we need to kill the hatching greenlet first if self.hatching_greenlet and not self.hatching_greenlet.ready(): self.hatching_greenlet.kill(block=True) - self.stop_user_instances([g.args[0] for g in self.user_greenlets]) + self.stop_users(self.user_count) self.state = STATE_STOPPED self.cpu_log_warning() - + def quit(self): """ Stop any running load test and kill all greenlets for the runner diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index a0656de9ba..77a1ca4932 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -339,6 +339,35 @@ def trigger(self): finally: timeout.cancel() + def test_stop_users_with_hatch_rate(self): + class MyUser(User): + wait_time = constant(1) + @task + def my_task(self): + pass + + environment = Environment(user_classes=[MyUser]) + runner = LocalRunner(environment) + + # Start a new load test + runner.start(10, 10) + sleep(0.5) + + # Update the running test with less users and a slow hatch_rate + runner.start(2, 1) + + # Wait a moment and then ensure the user count has started to drop but + # not immediately to user_count + sleep(1) + user_count = len(runner.user_greenlets) + self.assertTrue(user_count > 2, "User count has decreased too quickly: %i" % user_count) + self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count) + + # Wait and ensure load test users eventually dropped to desired count + sleep(5) + user_count = len(runner.user_greenlets) + self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count) + class TestMasterWorkerRunners(LocustTestCase): def test_distributed_integration_run(self): @@ -784,7 +813,7 @@ def test_spawn_fewer_locusts_than_workers(self): num_users += msg.data["num_users"] self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2") - + def test_spawn_locusts_in_stepload_mode(self): with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: master = self.get_runner() From 7d047a4dd5d1994ee8e244d9283696483561a824 Mon Sep 17 00:00:00 2001 From: Max Williams Date: Tue, 4 Aug 2020 17:41:16 +0200 Subject: [PATCH 2/6] stop immediately if stop_rate is higher than user_count --- locust/runners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/locust/runners.py b/locust/runners.py index 3394bf151a..2b2398f75a 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -195,7 +195,7 @@ def stop_users(self, user_count, stop_rate=None): if not to_stop: return - if stop_rate == None or user_count == stop_rate: + if stop_rate == None or stop_rate >= user_count: sleep_time = 0 logger.info("Stopping %i users immediately" % (user_count)) else: From 24026f7b51dceebb1fcee18f48c570137f1938a3 Mon Sep 17 00:00:00 2001 From: Max Williams Date: Mon, 10 Aug 2020 12:23:24 +0200 Subject: [PATCH 3/6] adjust stopping behaviour when stop_timeout is enabled --- locust/runners.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 2b2398f75a..92ab7cae6f 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -202,26 +202,29 @@ def stop_users(self, user_count, stop_rate=None): sleep_time = 1.0 / stop_rate logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate)) + if self.environment.stop_timeout: + stop_group = Group() + while True: user_to_stop = to_stop.pop(random.randint(0, len(to_stop)-1)) logger.debug('Stopping %s' % user_to_stop._greenlet.name) if self.environment.stop_timeout: - stop_group = Group() if not user_to_stop.stop(self.user_greenlets, force=False): # User.stop() returns False if the greenlet was not stopped, so we'll need # to add it's greenlet to our stopping Group so we can wait for it to finish it's task stop_group.add(user_to_stop._greenlet) - if not stop_group.join(timeout=self.environment.stop_timeout): - logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout) - stop_group.kill(block=True) else: user_to_stop.stop(self.user_greenlets, force=True) if to_stop: gevent.sleep(sleep_time) else: - logger.info("%i Users have been stopped" % user_count) break + if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout): + logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout) + stop_group.kill(block=True) + + logger.info("%i Users have been stopped" % user_count) def monitor_cpu(self): process = psutil.Process() From ded61e0eeb64857b4b0ecfb3fd34ad8196abaa60 Mon Sep 17 00:00:00 2001 From: Max Williams Date: Mon, 10 Aug 2020 12:31:23 +0200 Subject: [PATCH 4/6] add new test for ramping down with stop_timeout enabled --- locust/test/test_runners.py | 40 ++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 77a1ca4932..0d08efb6e8 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -349,12 +349,10 @@ def my_task(self): environment = Environment(user_classes=[MyUser]) runner = LocalRunner(environment) - # Start a new load test - runner.start(10, 10) - sleep(0.5) - - # Update the running test with less users and a slow hatch_rate - runner.start(2, 1) + # Start load test, wait for users to start, then trigger ramp down + runner.start(10, 10, wait=False) + gevent.sleep(1) + runner.start(2, 2, wait=False) # Wait a moment and then ensure the user count has started to drop but # not immediately to user_count @@ -972,7 +970,7 @@ def on_test_start(**kw): self.assertEqual(2, MyTestUser._test_state) # make sure the test_start was never fired on the worker self.assertFalse(test_start_run[0]) - + def test_worker_without_stop_timeout(self): class MyTestUser(User): _test_state = 0 @@ -1294,3 +1292,31 @@ def on_test_stop_fail(*args, **kwargs): self.assertEqual(0, test_stop_run[0]) runner.stop() self.assertEqual(2, test_stop_run[0]) + + def test_stop_timeout_with_ramp_down(self): + class MyTaskSet(TaskSet): + @task + def my_task(self): + gevent.sleep(1) + + class MyTestUser(User): + tasks = [MyTaskSet] + wait_time = constant(0) + + environment = Environment(user_classes=[MyTestUser], stop_timeout=2) + runner = environment.create_local_runner() + + # Start load test, wait for users to start, then trigger ramp down + runner.start(10, 10, wait=False) + gevent.sleep(1) + runner.start(2, 2, wait=False) + + sleep(2) + user_count = len(runner.user_greenlets) + self.assertTrue(user_count > 2, "User count has decreased too quickly: %i" % user_count) + self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count) + + # Wait and ensure load test users eventually dropped to desired count + sleep(5) + user_count = len(runner.user_greenlets) + self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count) From 28fc8c3d491cd6aaead62edb76c41c665f9ec28f Mon Sep 17 00:00:00 2001 From: Max Williams Date: Tue, 11 Aug 2020 17:00:59 +0200 Subject: [PATCH 5/6] tighten all the test timing numbers --- locust/test/test_runners.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 0d08efb6e8..a1dcf0943f 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -351,18 +351,18 @@ def my_task(self): # Start load test, wait for users to start, then trigger ramp down runner.start(10, 10, wait=False) - gevent.sleep(1) - runner.start(2, 2, wait=False) + sleep(1) + runner.start(2, 4, wait=False) # Wait a moment and then ensure the user count has started to drop but # not immediately to user_count sleep(1) user_count = len(runner.user_greenlets) - self.assertTrue(user_count > 2, "User count has decreased too quickly: %i" % user_count) + self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count) self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count) - + # Wait and ensure load test users eventually dropped to desired count - sleep(5) + sleep(2) user_count = len(runner.user_greenlets) self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count) @@ -1308,15 +1308,17 @@ class MyTestUser(User): # Start load test, wait for users to start, then trigger ramp down runner.start(10, 10, wait=False) - gevent.sleep(1) - runner.start(2, 2, wait=False) + sleep(1) + runner.start(2, 4, wait=False) - sleep(2) + # Wait a moment and then ensure the user count has started to drop but + # not immediately to user_count + sleep(1) user_count = len(runner.user_greenlets) - self.assertTrue(user_count > 2, "User count has decreased too quickly: %i" % user_count) + self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count) self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count) # Wait and ensure load test users eventually dropped to desired count - sleep(5) + sleep(2) user_count = len(runner.user_greenlets) self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count) From dc70aa0fe006fe0068bd084aca6c12af539bfa0e Mon Sep 17 00:00:00 2001 From: Max Williams Date: Fri, 14 Aug 2020 12:22:53 +0200 Subject: [PATCH 6/6] change wording in UI --- locust/templates/index.html | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/locust/templates/index.html b/locust/templates/index.html index 4b6f927031..147a32d357 100644 --- a/locust/templates/index.html +++ b/locust/templates/index.html @@ -56,7 +56,7 @@ Close
-

Start new Locust run

+

Start new load test


@@ -86,11 +86,11 @@

Start new Locust run

Close
-

Change the locust count

+

Edit running load test


- +
{% if is_step_load %}