Skip to content

Commit

Permalink
Merge pull request #1560 from locustio/refactor-and-fix-delayed-user-…
Browse files Browse the repository at this point in the history
…stopping-in-combination-with-on_stop

Refactor and fix delayed user stopping in combination with on_stop
  • Loading branch information
cyberw authored Sep 16, 2020
2 parents e682038 + 2c1cdcf commit 6871e6b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 36 deletions.
27 changes: 17 additions & 10 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
from time import time

import gevent
import greenlet
import psutil
from gevent.pool import Group

from . import User
from .log import greenlet_exception_logger
from .rpc import Message, rpc
from .stats import RequestStats, setup_distributed_stats_event_listeners

from .exception import RPCError
from .user.task import LOCUST_STATE_STOPPING


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,25 +228,28 @@ def stop_users(self, user_count, stop_rate=None):
sleep_time = 1.0 / stop_rate
logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))

if self.environment.stop_timeout:
stop_group = Group()
async_calls_to_stop = Group()
stop_group = Group()

while True:
user_to_stop = to_stop.pop(random.randint(0, len(to_stop) - 1))
user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1))
logger.debug("Stopping %s" % user_to_stop._greenlet.name)
if self.environment.stop_timeout:
if not user_to_stop.stop(self.user_greenlets, force=False):
# User.stop() returns False if the greenlet was not stopped, so we'll need
# to add it's greenlet to our stopping Group so we can wait for it to finish it's task
stop_group.add(user_to_stop._greenlet)
if user_to_stop._greenlet is greenlet.getcurrent():
# User called runner.quit(), so dont block waiting for killing to finish"
user_to_stop._group.killone(user_to_stop._greenlet, block=False)
elif self.environment.stop_timeout:
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=False))
stop_group.add(user_to_stop._greenlet)
else:
user_to_stop.stop(self.user_greenlets, force=True)
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=True))
if to_stop:
gevent.sleep(sleep_time)
else:
break

if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout):
async_calls_to_stop.join()

if not stop_group.join(timeout=self.environment.stop_timeout):
logger.info(
"Not all users finished their tasks & terminated in %s seconds. Stopping them..."
% self.environment.stop_timeout
Expand Down
6 changes: 3 additions & 3 deletions locust/test/test_locust_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class MyUser(User):
user = MyUser(self.environment)
user.start(group)
sleep(0.05)
user.stop(group)
user.stop()
sleep(0)

self.assertTrue(user.t2_executed)
Expand Down Expand Up @@ -526,7 +526,7 @@ def t(self):
self.assertEqual(1, user.test_state)

# stop User gracefully
user.stop(group, force=False)
user.stop(force=False)
sleep(0)
# make sure instance is not killed right away
self.assertIn(greenlet, group)
Expand Down Expand Up @@ -555,7 +555,7 @@ def t(self):
self.assertEqual(1, user.test_state)

# stop User gracefully
user.stop(group, force=True)
user.stop(force=True)
sleep(0)
# make sure instance is killed right away, and that the task did NOT get to finish
self.assertEqual(0, len(group))
Expand Down
35 changes: 34 additions & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import gevent
from gevent import sleep
from gevent.queue import Queue
import greenlet

import locust
from locust import runners, between, constant, LoadTestShape
Expand Down Expand Up @@ -360,14 +361,18 @@ def test_runner_reference_on_environment(self):
self.assertEqual(env, runner.environment)
self.assertEqual(runner, env.runner)

def test_users_can_call_runner_quit(self):
def test_users_can_call_runner_quit_without_deadlocking(self):
class BaseUser(User):
wait_time = constant(0)
stop_triggered = False

@task
def trigger(self):
self.environment.runner.quit()

def on_stop(self):
BaseUser.stop_triggered = True

runner = Environment(user_classes=[BaseUser]).create_local_runner()
runner.spawn_users(1, 1, wait=False)
timeout = gevent.Timeout(0.5)
Expand All @@ -379,6 +384,34 @@ def trigger(self):
finally:
timeout.cancel()

self.assertTrue(BaseUser.stop_triggered)

def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):
class BaseUser(User):
wait_time = constant(0)
stop_count = 0

@task
def trigger(self):
pass

def on_stop(self):
gevent.sleep(0.1)
BaseUser.stop_count += 1

runner = Environment(user_classes=[BaseUser]).create_local_runner()
runner.spawn_users(10, 10, wait=False)
timeout = gevent.Timeout(0.3)
timeout.start()
try:
runner.quit()
except gevent.Timeout:
self.fail("Got Timeout exception, runner must have hung somehow.")
finally:
timeout.cancel()

self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop

def test_stop_users_with_spawn_rate(self):
class MyUser(User):
wait_time = constant(1)
Expand Down
13 changes: 6 additions & 7 deletions locust/user/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ def run(self):
self.schedule_task(self.get_next_task())

try:
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.execute_next_task()
except RescheduleTaskImmediately:
pass
Expand Down Expand Up @@ -372,19 +373,17 @@ def wait(self):
set a stop_timeout. If this behaviour is not desired you should make the user wait using
gevent.sleep() instead.
"""
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.user._state = LOCUST_STATE_WAITING
self._sleep(self.wait_time())
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.user._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)

def _check_stop_condition(self):
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()

def interrupt(self, reschedule=True):
"""
Interrupt the TaskSet and hand over execution control back to the parent TaskSet.
Expand Down
26 changes: 11 additions & 15 deletions locust/user/users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from gevent import GreenletExit, greenlet

from gevent.pool import Group
from locust.clients import HttpSession
from locust.exception import LocustError, StopUser
from locust.util import deprecation
Expand Down Expand Up @@ -111,7 +111,8 @@ class ForumPage(TaskSet):

client = NoClientWarningRaiser()
_state = None
_greenlet = None
_greenlet: greenlet.Greenlet = None
_group: Group
_taskset_instance = None

def __init__(self, environment):
Expand Down Expand Up @@ -154,11 +155,11 @@ def wait(self):
"""
self._taskset_instance.wait()

def start(self, gevent_group):
def start(self, group: Group):
"""
Start a greenlet that runs this User instance.
:param gevent_group: Group instance where the greenlet will be spawned.
:param group: Group instance where the greenlet will be spawned.
:type gevent_group: gevent.pool.Group
:returns: The spawned greenlet.
"""
Expand All @@ -171,26 +172,21 @@ def run_user(user):
"""
user.run()

self._greenlet = gevent_group.spawn(run_user, self)
self._greenlet = group.spawn(run_user, self)
self._group = group
return self._greenlet

def stop(self, gevent_group, force=False):
def stop(self, force=False):
"""
Stop the user greenlet that exists in the gevent_group.
Stop the user greenlet.
:param gevent_group: Group instance where the greenlet will be spawned.
:type gevent_group: gevent.pool.Group
:param force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING
which will make the User instance stop once any currently running task is complete and on_stop
methods are called. If force is True the greenlet will be killed immediately.
:returns: True if the greenlet was killed immediately, otherwise False
"""
if self._greenlet is greenlet.getcurrent():
# the user is stopping itself (from within a task), so blocking would deadlock
gevent_group.killone(self._greenlet, block=False)
return True
elif force or self._state == LOCUST_STATE_WAITING:
gevent_group.killone(self._greenlet)
if force or self._state == LOCUST_STATE_WAITING:
self._group.killone(self._greenlet)
return True
elif self._state == LOCUST_STATE_RUNNING:
self._state = LOCUST_STATE_STOPPING
Expand Down

0 comments on commit 6871e6b

Please sign in to comment.