Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ramping down of users #1502

Merged
merged 6 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,21 @@ def hatch():
logger.debug("%i users hatched" % len(self.user_greenlets))
if bucket:
gevent.sleep(sleep_time)

hatch()
if wait:
self.user_greenlets.join()
logger.info("All users stopped\n")

def stop_users(self, user_count):
def stop_users(self, user_count, stop_rate=None):
"""
Stop a stop_count of weighted users from the Group() object in self.users
Stop `user_count` weighted users at a rate of `stop_rate`
"""
if user_count == 0 or stop_rate == 0:
return

bucket = self.weight_users(user_count)
user_count = len(bucket)
logger.info("Stopping %i users" % user_count)
to_stop = []
for g in self.user_greenlets:
for l in bucket:
Expand All @@ -189,25 +191,41 @@ def stop_users(self, user_count):
to_stop.append(user)
bucket.remove(l)
break
self.stop_user_instances(to_stop)
self.environment.events.hatch_complete.fire(user_count=self.user_count)


def stop_user_instances(self, users):

if not to_stop:
return

if stop_rate == None or stop_rate >= user_count:
sleep_time = 0
logger.info("Stopping %i users immediately" % (user_count))
else:
sleep_time = 1.0 / stop_rate
logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))

if self.environment.stop_timeout:
stopping = Group()
for user in users:
if not user.stop(self.user_greenlets, force=False):
stop_group = Group()

while True:
user_to_stop = to_stop.pop(random.randint(0, len(to_stop)-1))
logger.debug('Stopping %s' % user_to_stop._greenlet.name)
if self.environment.stop_timeout:
if not user_to_stop.stop(self.user_greenlets, force=False):
# User.stop() returns False if the greenlet was not stopped, so we'll need
# to add it's greenlet to our stopping Group so we can wait for it to finish it's task
stopping.add(user._greenlet)
if not stopping.join(timeout=self.environment.stop_timeout):
logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stopping.kill(block=True)
else:
for user in users:
user.stop(self.user_greenlets, force=True)
stop_group.add(user_to_stop._greenlet)
else:
user_to_stop.stop(self.user_greenlets, force=True)
if to_stop:
gevent.sleep(sleep_time)
else:
break

if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout):
logger.info("Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout)
stop_group.kill(block=True)

logger.info("%i Users have been stopped" % user_count)

def monitor_cpu(self):
process = psutil.Process()
while True:
Expand All @@ -234,13 +252,13 @@ def start(self, user_count, hatch_rate, wait=False):
self.worker_cpu_warning_emitted = False
self.target_user_count = user_count

# Dynamically changing the user count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
logger.debug("Updating running test with %d users, %.2f hatch rate and wait=%r" % (user_count, hatch_rate, wait))
self.state = STATE_HATCHING
if self.user_count > user_count:
# Stop some users
stop_count = self.user_count - user_count
self.stop_users(stop_count)
self.stop_users(stop_count, hatch_rate)
elif self.user_count < user_count:
# Spawn some users
spawn_count = user_count - self.user_count
Expand Down Expand Up @@ -284,10 +302,10 @@ def stop(self):
# if we are currently hatching users we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.stop_user_instances([g.args[0] for g in self.user_greenlets])
self.stop_users(self.user_count)
self.state = STATE_STOPPED
self.cpu_log_warning()

def quit(self):
"""
Stop any running load test and kill all greenlets for the runner
Expand Down
61 changes: 59 additions & 2 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,33 @@ def trigger(self):
finally:
timeout.cancel()

def test_stop_users_with_hatch_rate(self):
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
pass

environment = Environment(user_classes=[MyUser])
runner = LocalRunner(environment)

# Start load test, wait for users to start, then trigger ramp down
runner.start(10, 10, wait=False)
sleep(1)
runner.start(2, 4, wait=False)

# Wait a moment and then ensure the user count has started to drop but
# not immediately to user_count
sleep(1)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)
self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)

# Wait and ensure load test users eventually dropped to desired count
sleep(2)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)


class TestMasterWorkerRunners(LocustTestCase):
def test_distributed_integration_run(self):
Expand Down Expand Up @@ -784,7 +811,7 @@ def test_spawn_fewer_locusts_than_workers(self):
num_users += msg.data["num_users"]

self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")

def test_spawn_locusts_in_stepload_mode(self):
with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:
master = self.get_runner()
Expand Down Expand Up @@ -943,7 +970,7 @@ def on_test_start(**kw):
self.assertEqual(2, MyTestUser._test_state)
# make sure the test_start was never fired on the worker
self.assertFalse(test_start_run[0])

def test_worker_without_stop_timeout(self):
class MyTestUser(User):
_test_state = 0
Expand Down Expand Up @@ -1265,3 +1292,33 @@ def on_test_stop_fail(*args, **kwargs):
self.assertEqual(0, test_stop_run[0])
runner.stop()
self.assertEqual(2, test_stop_run[0])

def test_stop_timeout_with_ramp_down(self):
class MyTaskSet(TaskSet):
@task
def my_task(self):
gevent.sleep(1)

class MyTestUser(User):
tasks = [MyTaskSet]
wait_time = constant(0)

environment = Environment(user_classes=[MyTestUser], stop_timeout=2)
runner = environment.create_local_runner()

# Start load test, wait for users to start, then trigger ramp down
runner.start(10, 10, wait=False)
sleep(1)
runner.start(2, 4, wait=False)

# Wait a moment and then ensure the user count has started to drop but
# not immediately to user_count
sleep(1)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)
self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)

# Wait and ensure load test users eventually dropped to desired count
sleep(2)
user_count = len(runner.user_greenlets)
self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)