Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC load test example #1755

Merged
merged 2 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/testing-other-systems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,24 @@ We can build a generic XML-RPC client, by wrapping :py:class:`xmlrpc.client.Serv
.. literalinclude:: ../examples/custom_xmlrpc_client/xmlrpc_locustfile.py

For more examples, see `locust-plugins <https://github.com/SvenskaSpel/locust-plugins#users>`_

Example: writing a gRPC User/client
=======================================

Similarly to the XML-RPC example, we can also load test a gRPC server.

.. literalinclude:: ../examples/grpc/hello_server.py

In this case, the gRPC stub methods can also be wrapped so that we can record the request stats.

.. literalinclude:: ../examples/grpc/locustfile.py

Note: In order to make the `grpcio` Python library gevent-compatible the following code needs to be executed before creating the gRPC channel.

```python
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
```

Note: It is important to close the gRPC channel before stopping the User greenlet; otherwise Locust may not be able to stop executing.
This is due to an issue in `grpcio` (see `grpc#15880 <https://github.com/grpc/grpc/issues/15880>`_).
15 changes: 15 additions & 0 deletions examples/grpc/hello.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package locust.hello;

service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
string name = 1;
}

message HelloResponse {
string message = 1;
}
159 changes: 159 additions & 0 deletions examples/grpc/hello_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions examples/grpc/hello_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

import hello_pb2 as hello__pb2


class HelloServiceStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.

Args:
channel: A grpc.Channel.
"""
self.SayHello = channel.unary_unary(
"/locust.hello.HelloService/SayHello",
request_serializer=hello__pb2.HelloRequest.SerializeToString,
response_deserializer=hello__pb2.HelloResponse.FromString,
)


class HelloServiceServicer(object):
"""Missing associated documentation comment in .proto file."""

def SayHello(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")


def add_HelloServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
"SayHello": grpc.unary_unary_rpc_method_handler(
servicer.SayHello,
request_deserializer=hello__pb2.HelloRequest.FromString,
response_serializer=hello__pb2.HelloResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler("locust.hello.HelloService", rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class HelloService(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def SayHello(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/locust.hello.HelloService/SayHello",
hello__pb2.HelloRequest.SerializeToString,
hello__pb2.HelloResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
24 changes: 24 additions & 0 deletions examples/grpc/hello_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import hello_pb2_grpc
import hello_pb2
import grpc
from concurrent import futures
import logging
import time

logger = logging.getLogger(__name__)


class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
def SayHello(self, request, context):
name = request.name
time.sleep(1)
return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
logger.info("gRPC server started")
server.wait_for_termination()
82 changes: 82 additions & 0 deletions examples/grpc/locustfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import grpc
import hello_pb2_grpc
import hello_pb2
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from hello_server import start_server
import gevent
import time

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()


@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)


class GrpcClient:
def __init__(self, stub):
self._stub_class = stub.__class__
self._stub = stub

def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)

def wrapper(*args, **kwargs):
start_time = time.monotonic()
request_meta = {
"request_type": "grpc",
"name": name,
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(request_meta["response"].message)
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.monotonic() - start_time) * 1000
events.request.fire(**request_meta)
return request_meta["response"]

return wrapper


class GrpcUser(User):
abstract = True

stub_class = None

def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(stub)

def stop(self, force=False):
self._channel_closed = True
time.sleep(1)
self._channel.close()
super().stop(force=True)


class HelloGrpcUser(GrpcUser):
host = "localhost:50051"
stub_class = hello_pb2_grpc.HelloServiceStub

@task
def sayHello(self):
if not self._channel_closed:
self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
time.sleep(1)
4 changes: 2 additions & 2 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ def stop_users(self, user_count, stop_rate=None):
# User called runner.quit(), so dont block waiting for killing to finish"
user_to_stop._group.killone(user_to_stop._greenlet, block=False)
elif self.environment.stop_timeout:
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=False))
async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))
stop_group.add(user_to_stop._greenlet)
else:
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=True))
async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))
if to_stop:
gevent.sleep(sleep_time)
else:
Expand Down