Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to run listener functions for custom_messages concurrently #2650

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/running-distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,20 @@ order to coordinate the test. This can be easily accomplished with custom messag
environment.runner.send_message('test_users', users)

Note that when running locally (i.e. non-distributed), this functionality will be preserved;
the messages will simply be handled by the runner that sends them.
the messages will simply be handled by the runner that sends them.

.. note::
Using the default options while registering a message handler will run the listener function
in a **blocking** way, resulting in the heartbeat and other messages being delayed for the amount
of the execution.
If it is known that the listener function will handle time-intensive tasks, it is possible to register the
function as **concurrent** (as a separate greenlet).

.. code-block::
environment.runner.register_message('test_users', setup_test_users, concurrent=True)

Please use this feature with care, as otherwise it could result in greenlets running and influencing
the running loadtest.

For more details, see the `complete example <https://github.com/locustio/locust/tree/master/examples/custom_messages.py>`_.

Expand Down
12 changes: 12 additions & 0 deletions examples/custom_messages.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from locust import HttpUser, between, events, task
from locust.runners import MasterRunner, WorkerRunner

import gevent

usernames = []


def setup_test_users(environment, msg, **kwargs):
# Fired when the worker receives a message of type 'test_users'
usernames.extend(map(lambda u: u["name"], msg.data))
# Even though "acknowledge_concurrent_users" was sent first, "acknowledge_users"
# will print its statement first, as "acknowledge_concurrent_users" was registered
# running concurrently, and therefore not blocking other messages.
environment.runner.send_message("concurrent_message", "This is a non blocking message")
environment.runner.send_message("acknowledge_users", f"Thanks for the {len(msg.data)} users!")


Expand All @@ -15,12 +21,18 @@ def on_acknowledge(msg, **kwargs):
print(msg.data)


def on_concurrent_message(msg, **kwargs):
gevent.sleep(10)
print(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message("test_users", setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message("acknowledge_users", on_acknowledge)
environment.runner.register_message("concurrent_message", on_concurrent_message, concurrent=True)


@events.test_start.add_listener
Expand Down
20 changes: 14 additions & 6 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, environment: Environment) -> None:
self.target_user_classes_count: dict[str, int] = {}
# target_user_count is set before the ramp-up/ramp-down occurs.
self.target_user_count: int = 0
self.custom_messages: dict[str, Callable] = {}
self.custom_messages: dict[str, tuple[Callable, bool]] = {}

self._users_dispatcher: UsersDispatcher | None = None

Expand Down Expand Up @@ -420,7 +420,7 @@ def log_exception(self, node_id: str, msg: str, formatted_tb: str) -> None:
row["nodes"].add(node_id)
self.exceptions[key] = row

def register_message(self, msg_type: str, listener: Callable) -> None:
def register_message(self, msg_type: str, listener: Callable, concurrent=False) -> None:
"""
Register a listener for a custom message from another node

Expand All @@ -429,7 +429,7 @@ def register_message(self, msg_type: str, listener: Callable) -> None:
"""
if msg_type in self.custom_messages:
raise Exception(f"Tried to register listener method for {msg_type}, but it already had a listener!")
self.custom_messages[msg_type] = listener
self.custom_messages[msg_type] = (listener, concurrent)


class LocalRunner(Runner):
Expand Down Expand Up @@ -568,7 +568,7 @@ def send_message(self, msg_type: str, data: Any | None = None, client_id: str |
"""
logger.debug("Running locally: sending %s message to self" % msg_type)
if msg_type in self.custom_messages:
listener = self.custom_messages[msg_type]
listener, concurrent = self.custom_messages[msg_type]
msg = Message(msg_type, data, "local")
listener(environment=self.environment, msg=msg)
else:
Expand Down Expand Up @@ -1139,7 +1139,11 @@ def client_listener(self) -> NoReturn:
f"Received {msg.type} message from worker {msg.node_id} (index {self.get_worker_index(msg.node_id)})"
)
try:
self.custom_messages[msg.type](environment=self.environment, msg=msg)
listener, concurrent = self.custom_messages[msg.type]
if not concurrent:
listener(environment=self.environment, msg=msg)
else:
gevent.spawn(listener, self.environment, msg)
except Exception:
logging.error(f"Uncaught exception in handler for {msg.type}\n{traceback.format_exc()}")

Expand Down Expand Up @@ -1393,7 +1397,11 @@ def worker(self) -> NoReturn:
self.last_heartbeat_timestamp = time.time()
elif msg.type in self.custom_messages:
logger.debug("Received %s message from master" % msg.type)
self.custom_messages[msg.type](environment=self.environment, msg=msg)
listener, concurrent = self.custom_messages[msg.type]
if not concurrent:
listener(environment=self.environment, msg=msg)
else:
gevent.spawn(listener, self.environment, msg)
else:
logger.warning(f"Unknown message type received: {msg.type}")

Expand Down
25 changes: 25 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,31 @@ def on_custom_msg(msg, **kw):
self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]["test_data"])

def test_concurrent_custom_message(self):
class MyUser(User):
wait_time = constant(1)

@task
def my_task(self):
pass

test_custom_msg = [False]
test_custom_msg_data = [{}]

def on_custom_msg(msg, **kw):
test_custom_msg[0] = True
test_custom_msg_data[0] = msg.data

environment = Environment(user_classes=[MyUser])
runner = LocalRunner(environment)

runner.register_message("test_custom_msg", on_custom_msg, concurrent=True)
runner.send_message("test_custom_msg", {"test_data": 123})

gevent.sleep(0.5)
self.assertTrue(test_custom_msg[0])
self.assertEqual(123, test_custom_msg_data[0]["test_data"])

def test_undefined_custom_message(self):
class MyUser(User):
wait_time = constant(1)
Expand Down
Loading