Skip to content

Commit

Permalink
Merge pull request #1502 from max-rocket-internet/ramp_down
Browse files Browse the repository at this point in the history
Allow ramping down of users
  • Loading branch information
cyberw authored Aug 14, 2020
2 parents 4833395 + dc70aa0 commit 19b2dc0
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 28 deletions.
64 changes: 41 additions & 23 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,21 @@ def hatch():
logger.debug("%i users hatched" % len(self.user_greenlets))
if bucket:
gevent.sleep(sleep_time)

hatch()
if wait:
self.user_greenlets.join()
logger.info("All users stopped\n")

def stop_users(self, user_count):
def stop_users(self, user_count, stop_rate=None):
"""
Stop a stop_count of weighted users from the Group() object in self.users
Stop `user_count` weighted users at a rate of `stop_rate`
"""
if user_count == 0 or stop_rate == 0:
return

bucket = self.weight_users(user_count)
user_count = len(bucket)
logger.info("Stopping %i users" % user_count)
to_stop = []
for g in self.user_greenlets:
for l in bucket:
Expand All @@ -189,25 +191,41 @@ def stop_users(self, user_count):
to_stop.append(user)
bucket.remove(l)
break
self.stop_user_instances(to_stop)
self.environment.events.hatch_complete.fire(user_count=self.user_count)


def stop_user_instances(self, users):

if not to_stop:
return

if stop_rate == None or stop_rate >= user_count:
sleep_time = 0
logger.info("Stopping %i users immediately" % (user_count))
else:
sleep_time = 1.0 / stop_rate
logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))

if self.environment.stop_timeout:
stopping = Group()
for user in users:
if not user.stop(self.user_greenlets, force=False):
stop_group = Group()

while True:
user_to_stop = to_stop.pop(random.randint(0, len(to_stop)-1))
logger.debug('Stopping %s' % user_to_stop._greenlet.name)
if self.environment.stop_timeout:
if not user_to_stop.stop(self.user_greenlets, force=False):
# User.stop() returns False if the greenlet was not stopped, so we'll need
# to add it's greenlet to our stopping Group so we can wait for it to finish it's task
stopping.add(user._greenlet)
if not stopping.join(timeout=self.environment.stop_timeout):
logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stopping.kill(block=True)
else:
for user in users:
user.stop(self.user_greenlets, force=True)
stop_group.add(user_to_stop._greenlet)
else:
user_to_stop.stop(self.user_greenlets, force=True)
if to_stop:
gevent.sleep(sleep_time)
else:
break

if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout):
logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stop_group.kill(block=True)

logger.info("%i Users have been stopped" % user_count)

def monitor_cpu(self):
process = psutil.Process()
while True:
Expand All @@ -234,13 +252,13 @@ def start(self, user_count, hatch_rate, wait=False):
self.worker_cpu_warning_emitted = False
self.target_user_count = user_count

# Dynamically changing the user count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
logger.debug("Updating running test with %d users, %.2f hatch rate and wait=%r" % (user_count, hatch_rate, wait))
self.state = STATE_HATCHING
if self.user_count > user_count:
# Stop some users
stop_count = self.user_count - user_count
self.stop_users(stop_count)
self.stop_users(stop_count, hatch_rate)
elif self.user_count < user_count:
# Spawn some users
spawn_count = user_count - self.user_count
Expand Down Expand Up @@ -284,10 +302,10 @@ def stop(self):
# if we are currently hatching users we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.stop_user_instances([g.args[0] for g in self.user_greenlets])
self.stop_users(self.user_count)
self.state = STATE_STOPPED
self.cpu_log_warning()

def quit(self):
"""
Stop any running load test and kill all greenlets for the runner
Expand Down
6 changes: 3 additions & 3 deletions locust/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<a href="#" class="close_link">Close</a>
</div>
<div class="padder">
<h2>Start new Locust run</h2>
<h2>Start new load test</h2>
<form action="./swarm" method="POST" id="swarm_form">
<label for="user_count">Number of total users to simulate</label>
<input type="text" name="user_count" id="user_count" class="val" value="{{ num_users or "" }}"/><br>
Expand Down Expand Up @@ -86,11 +86,11 @@ <h2>Start new Locust run</h2>
<a href="#" class="close_link">Close</a>
</div>
<div class="padder">
<h2>Change the locust count</h2>
<h2>Edit running load test</h2>
<form action="./swarm" method="POST" id="edit_form">
<label for="new_user_count">Number of users to simulate</label>
<input type="text" name="user_count" id="new_user_count" class="val" value="{{ num_users or "" }}"/><br>
<label for="hatch_rate">Hatch rate <span style="color:#8a8a8a;">(users spawned/second)</span></label>
<label for="hatch_rate">Hatch rate <span style="color:#8a8a8a;">(users spawned or stopped per second)</span></label>
<input type="text" name="hatch_rate" id="new_hatch_rate" class="val" value="{{ hatch_rate or "" }}"/><br>
{% if is_step_load %}
<label for="step_user_count">Number of users to increase by step</label>
Expand Down
61 changes: 59 additions & 2 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,33 @@ def trigger(self):
finally:
timeout.cancel()

def test_stop_users_with_hatch_rate(self):
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
pass

environment = Environment(user_classes=[MyUser])
runner = LocalRunner(environment)

# Start load test, wait for users to start, then trigger ramp down
runner.start(10, 10, wait=False)
sleep(1)
runner.start(2, 4, wait=False)

# Wait a moment and then ensure the user count has started to drop but
# not immediately to user_count
sleep(1)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)
self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)

# Wait and ensure load test users eventually dropped to desired count
sleep(2)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)


class TestMasterWorkerRunners(LocustTestCase):
def test_distributed_integration_run(self):
Expand Down Expand Up @@ -784,7 +811,7 @@ def test_spawn_fewer_locusts_than_workers(self):
num_users += msg.data["num_users"]

self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")

def test_spawn_locusts_in_stepload_mode(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
Expand Down Expand Up @@ -943,7 +970,7 @@ def on_test_start(**kw):
self.assertEqual(2, MyTestUser._test_state)
# make sure the test_start was never fired on the worker
self.assertFalse(test_start_run[0])

def test_worker_without_stop_timeout(self):
class MyTestUser(User):
_test_state = 0
Expand Down Expand Up @@ -1265,3 +1292,33 @@ def on_test_stop_fail(*args, **kwargs):
self.assertEqual(0, test_stop_run[0])
runner.stop()
self.assertEqual(2, test_stop_run[0])

def test_stop_timeout_with_ramp_down(self):
class MyTaskSet(TaskSet):
@task
def my_task(self):
gevent.sleep(1)

class MyTestUser(User):
tasks = [MyTaskSet]
wait_time = constant(0)

environment = Environment(user_classes=[MyTestUser], stop_timeout=2)
runner = environment.create_local_runner()

# Start load test, wait for users to start, then trigger ramp down
runner.start(10, 10, wait=False)
sleep(1)
runner.start(2, 4, wait=False)

# Wait a moment and then ensure the user count has started to drop but
# not immediately to user_count
sleep(1)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)
self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)

# Wait and ensure load test users eventually dropped to desired count
sleep(2)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)

0 comments on commit 19b2dc0

Please sign in to comment.