Skip to content

Commit

Permalink
Make --no-web work with distributed. This works by adding several opt…
Browse files Browse the repository at this point in the history
…ions to poll for connections between masters and slaves. Also adds options that allow locusts to iterate through all locustfiles and collect stats for each run
  • Loading branch information
Justin Iso authored and justiniso committed Jan 7, 2016
1 parent c4b8105 commit 36f23be
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 22 deletions.
96 changes: 76 additions & 20 deletions locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@
import inspect
import logging
import socket
import time
from optparse import OptionParser
from gevent.pool import Group

import web
from log import setup_logging, console_logger
from stats import stats_printer, print_percentile_stats, print_error_report, print_stats
from inspectlocust import print_task_ratio, get_task_ratio_dict
from core import Locust, HttpLocust
from runners import MasterLocustRunner, SlaveLocustRunner, LocalLocustRunner
from runners import MasterLocustRunner, SlaveLocustRunner, LocalLocustRunner, NoWebMasterLocustRunner
import events

import polling

_internals = [Locust, HttpLocust]
version = locust.version


def parse_options():
"""
Handle command-line options with optparse.OptionParser.
Expand Down Expand Up @@ -116,6 +121,15 @@ def parse_options():
help="Port that locust master should bind to. Only used when running with --master. Defaults to 5557. Note that Locust will also use this port + 1, so by default the master node will bind to 5557 and 5558."
)

parser.add_option(
'--min-slaves',
action='store',
type='int',
dest='min_slaves',
default=1,
help="The minimum number of slaves for the master to expect before it starts swarming. Only used when running with --master and --no-web."
)

# if we should print stats in the console
parser.add_option(
'--no-web',
Expand Down Expand Up @@ -154,6 +168,24 @@ def parse_options():
default=None,
help="Number of requests to perform. Only used together with --no-web"
)

parser.add_option(
'-t', '--timeout',
action='store',
type='int',
dest='timeout',
default=None,
help="Maximum number of seconds to run each locustfile for. Note that there may be multiple locustfiles and this is not a global timeout."
)

parser.add_option(
'--cooldown',
action='store',
type='int',
dest='cooldown',
default=0,
help='Number of seconds to wait before running the next locustfile.'
)

# log level
parser.add_option(
Expand Down Expand Up @@ -437,32 +469,56 @@ def main():
}
console_logger.info(dumps(task_data))
sys.exit(0)

# if --master is set, make sure --no-web isn't set
if options.master and options.no_web:
logger.error("Locust can not run distributed with the web interface disabled (do not use --no-web and --master together)")
sys.exit(0)

if options.master and options.no_web and not options.min_slaves:
logger.error("When running --master and --no-web, you must specify --min-slaves to be available before starting to swarm")
sys.exit(1)

if options.master and options.no_web and not (options.timeout or options.num_requests):
logger.error("When running --master and --no-web, you must specify either --num-request or --timeout to tell the slaves when to stop running each locustfile")
sys.exit(1)

if not options.no_web and not options.slave:
# spawn web greenlet
logger.info("Starting web monitor at %s:%s" % (options.web_host or "*", options.port))
main_greenlet = gevent.spawn(web.start, locust_classes, options)

if not options.master and not options.slave:
runners.locust_runner = LocalLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles)
# spawn client spawning/hatching greenlet
if options.no_web:
runners.locust_runner.start_hatching(wait=True)
main_greenlet = runners.locust_runner.greenlet
elif options.master:
runners.locust_runner = MasterLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles)
elif options.slave:

if options.slave:
logger.info("Waiting for master to become available")
try:
runners.locust_runner = SlaveLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles)
main_greenlet = runners.locust_runner.greenlet
except socket.error, e:
logger.error("Failed to connect to the Locust master: %s", e)
runners.locust_runner = polling.poll(
lambda: SlaveLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles),
timeout=60,
step=1,
ignore_exceptions=(socket.error,))

except polling.TimeoutException, e:
logger.error("Failed to connect to the Locust master: %s", e.last)
sys.exit(-1)

main_greenlet = runners.locust_runner.greenlet

elif options.master:

if options.no_web:
# Just start running the suites as soon as the minimum number of slaves come online
runners.locust_runner = NoWebMasterLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles)

logger.info("Waiting for {} slaves to become available before swarming".format(options.min_slaves))
try:
runners.locust_runner.wait_for_slaves(options.min_slaves, timeout=60)
except polling.TimeoutException, e:
logger.error("Minimum expected slaves were never available. Expected {} ({} available)".format(options.min_slaves, e.last))
sys.exit(1)

runners.locust_runner.slaves_start_swarming(
max_num_requests=options.num_requests,
max_seconds_elapsed=options.timeout)

else:
runners.locust_runner = MasterLocustRunner(locust_classes, options, available_locustfiles=all_locustfiles)

main_greenlet = runners.locust_runner.greenlet

if not options.only_summary and (options.print_stats or (options.no_web and not options.slave)):
# spawn stats printing greenlet
Expand Down
78 changes: 77 additions & 1 deletion locust/runners.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding=UTF-8
import copy
import socket
import traceback
import warnings
Expand All @@ -8,6 +9,7 @@
from hashlib import md5

import gevent
import polling
from gevent import GreenletExit
from gevent.pool import Group

Expand Down Expand Up @@ -326,7 +328,7 @@ def switch(self, key):
"""
Switch to a different set of locust classes
"""
self.stop()
self.stats.clear_all()
for client in self.clients.itervalues():
self.server.send(Message("switch", {"key": key}, None))

Expand Down Expand Up @@ -376,6 +378,72 @@ def client_listener(self):
def slave_count(self):
return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)


class NoWebMasterLocustRunner(MasterLocustRunner):

def __init__(self, *args, **kwargs):
super(NoWebMasterLocustRunner, self).__init__(*args, **kwargs)
self.all_stats = {}
self.greenlet = Group()
self.max_num_requests = None
self.max_seconds_elapsed = None

def run_locustfiles(self):
"""
Iterate through all locustfiles and run each locustfile until the maximum number of requests are hit or
the maximum timeout in seconds has elapsed. When one of those conditions are met on each locustfile,
move on to the next one, storing the stats result of each locustfile run.
"""
for run_count, locustfile_key in enumerate(self.available_locustfiles.keys()):
self.switch(locustfile_key)
super(NoWebMasterLocustRunner, self).start_hatching(self.num_clients, self.hatch_rate)

while True:
import time

hit_max_requests = self.max_num_requests and (self.stats.aggregated_stats().num_requests >= self.max_num_requests)
hit_max_elapsed_time = self.max_seconds_elapsed and (time.time() - self.stats.start_time >= self.max_seconds_elapsed)

if hit_max_requests or hit_max_elapsed_time:
break

time.sleep(1)

self.all_stats[locustfile_key] = copy.deepcopy(self.stats)
self.stop()

def wait_for_slaves(self, min_slaves, timeout):
"""
Wait the specified timeout seconds until the minimum number of slaves come online
:param min_slaves: Minimum number of slaves to expect
:param timeout: Max number of seconds to wait before raising an exception
:raises: polling.TimeoutException
:return: The count of slaves currently available
"""
return polling.poll(
lambda: len(self.clients.ready),
check_success=lambda ready: ready >= min_slaves,
timeout=timeout,
step=1)

def slaves_start_swarming(self, max_num_requests=None, max_seconds_elapsed=None):
"""
Instruct the slaves to start swarming for the available locustfiles asynchronously
:param max_num_requests: Stop when the total number of requests for a locustfile reach this number
:param max_seconds_elapsed: Stop when the total elapsed seconds for a locustfile reach this number
"""
if max_num_requests is not None:
self.max_num_requests = max_num_requests
elif max_seconds_elapsed is not None:
self.max_seconds_elapsed = max_seconds_elapsed
else:
raise ValueError('You must specify the total number or requests to be made or a timeout')

self.state = STATE_HATCHING
self.greenlet.spawn(self.run_locustfiles).link_exception(callback=self.noop)

class SlaveLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(SlaveLocustRunner, self).__init__(*args, **kwargs)
Expand All @@ -387,6 +455,11 @@ def __init__(self, *args, **kwargs):
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)

# Register listener for when a locust starts hatching
def on_locust_start_hatching():
self.client.send(Message("hatching", None, self.client_id))
events.locust_start_hatching += on_locust_start_hatching

# register listener for when all locust users have hatched, and report it to the master node
def on_hatch_complete(user_count):
Expand Down Expand Up @@ -421,7 +494,10 @@ def worker(self):
self.host = job["host"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "switch":
self.stop()
self.switch(msg.data["key"])
self.client.send(Message("hatching", None, self.client_id))
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=self.num_clients, hatch_rate=self.hatch_rate))
elif msg.type == "stop":
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def run(self):
packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
include_package_data=True,
zip_safe=False,
install_requires=["gevent==1.0.1", "flask>=0.10.1", "requests>=2.4.1", "msgpack-python>=0.4.2"],
install_requires=["gevent==1.0.1", "flask>=0.10.1", "polling==0.2.0", "requests>=2.4.1", "msgpack-python>=0.4.2"],
tests_require=['unittest2', 'mock', 'pyzmq'],
entry_points={
'console_scripts': [
Expand Down

0 comments on commit 36f23be

Please sign in to comment.