Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple nodes per IP #804

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build,
self.port = port
self.node_id = node_id

def for_nodes(self, all_node_ips=None, ip=None, port=None, node_ids=None):
def for_nodes(self, all_node_ips=None, all_node_ids=None, ip=None, port=None, node_ids=None):
"""

Creates a StartNodes instance for a concrete IP, port and their associated node_ids.

:param all_node_ips: The IPs of all nodes in the cluster (including the current one).
:param all_node_ids: The numeric id of all nodes in the cluster (including the current one).
:param ip: The IP to set.
:param port: The port number to set.
:param node_ids: A list of node id to set.
:return: A corresponding ``StartNodes`` message with the specified IP, port number and node ids.
"""
return StartNodes(self.cfg, self.open_metrics_context, self.cluster_settings, self.sources, self.build, self.distribution,
self.external, self.docker, all_node_ips, ip, port, node_ids)
self.external, self.docker, all_node_ips, all_node_ids, ip, port, node_ids)


class EngineStarted:
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(self, reset_in_seconds):

class StartNodes:
def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build, distribution, external, docker,
all_node_ips, ip, port, node_ids):
all_node_ips, all_node_ids, ip, port, node_ids):
self.cfg = cfg
self.open_metrics_context = open_metrics_context
self.cluster_settings = cluster_settings
Expand All @@ -109,6 +110,7 @@ def __init__(self, cfg, open_metrics_context, cluster_settings, sources, build,
self.external = external
self.docker = docker
self.all_node_ips = all_node_ips
self.all_node_ids = all_node_ids
self.ip = ip
self.port = port
self.node_ids = node_ids
Expand Down Expand Up @@ -175,6 +177,13 @@ def extract_all_node_ips(ip_port_pairs):
return all_node_ips


def extract_all_node_ids(all_nodes_by_host):
all_node_ids = set()
for node_ids_per_host in all_nodes_by_host.values():
all_node_ids.update(node_ids_per_host)
return all_node_ids


def nodes_by_host(ip_port_pairs):
nodes = {}
node_id = 0
Expand Down Expand Up @@ -361,9 +370,11 @@ def receiveMsg_StartEngine(self, startmsg, sender):
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
all_nodes_by_host = nodes_by_host(all_ips_and_ports)
all_node_ids = extract_all_node_ids(all_nodes_by_host)

for (ip, port), node in nodes_by_host(all_ips_and_ports).items():
submsg = startmsg.for_nodes(all_node_ips, ip, port, node)
for (ip, port), node in all_nodes_by_host.items():
submsg = startmsg.for_nodes(all_node_ips, all_node_ids, ip, port, node)
submsg.reply_to = sender
if '127.0.0.1' == ip:
m = self.createActor(NodeMechanicActor,
Expand Down Expand Up @@ -461,8 +472,8 @@ def receiveMsg_StartNodes(self, msg, sender):
self.metrics_store.open(ctx=msg.open_metrics_context)
# avoid follow-up errors in case we receive an unexpected ActorExitRequest due to an early failure in a parent actor.

self.mechanic = create(self.config, self.metrics_store, msg.all_node_ips, msg.cluster_settings, msg.sources, msg.build,
msg.distribution, msg.external, msg.docker)
self.mechanic = create(self.config, self.metrics_store, msg.all_node_ips, msg.all_node_ids, msg.cluster_settings,
msg.sources, msg.build, msg.distribution, msg.external, msg.docker)
nodes = self.mechanic.start_engine()
self.running = True
self.wakeupAfter(METRIC_FLUSH_INTERVAL_SECONDS)
Expand Down Expand Up @@ -535,7 +546,7 @@ def load_team(cfg, external):
return car, plugins


def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
def create(cfg, metrics_store, all_node_ips, all_node_ids, cluster_settings=None, sources=False, build=False, distribution=False, external=False,
docker=False):
races_root = paths.races_root(cfg)
challenge_root_path = paths.race_root(cfg)
Expand All @@ -546,7 +557,7 @@ def create(cfg, metrics_store, all_node_ips, cluster_settings=None, sources=Fals
s = supplier.create(cfg, sources, distribution, build, challenge_root_path, car, plugins)
p = []
for node_id in node_ids:
p.append(provisioner.local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, challenge_root_path, node_id))
p.append(provisioner.local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, all_node_ids, challenge_root_path, node_id))
l = launcher.ProcessLauncher(cfg, metrics_store, races_root)
elif external:
raise exceptions.RallyAssertionError("Externally provisioned clusters should not need to be managed by Rally's mechanic")
Expand Down
9 changes: 6 additions & 3 deletions esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from esrally.utils import console, io, process, versions


def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_root, node_id):
def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, all_node_ids, target_root, node_id):
distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False)
ip = cfg.opts("provisioning", "node.ip")
http_port = cfg.opts("provisioning", "node.http.port")
Expand All @@ -36,10 +36,11 @@ def local_provisioner(cfg, car, plugins, cluster_settings, all_node_ips, target_

node_name = "%s-%d" % (node_name_prefix, node_id)
node_root_dir = "%s/%s" % (target_root, node_name)
all_node_names = ["%s-%d" % (node_name_prefix, n) for n in all_node_ids]

_, java_home = java_resolver.java_home(car, cfg)

es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port)
es_installer = ElasticsearchInstaller(car, java_home, node_name, node_root_dir, all_node_ips, all_node_names, ip, http_port)
plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins]

return BareProvisioner(cluster_settings, es_installer, plugin_installers, preserve, distribution_version=distribution_version)
Expand Down Expand Up @@ -219,7 +220,7 @@ def _provisioner_variables(self):


class ElasticsearchInstaller:
def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, ip, http_port, hook_handler_class=team.BootstrapHookHandler):
def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, all_node_names, ip, http_port, hook_handler_class=team.BootstrapHookHandler):
self.car = car
self.java_home = java_home
self.node_name = node_name
Expand All @@ -228,6 +229,7 @@ def __init__(self, car, java_home, node_name, node_root_dir, all_node_ips, ip, h
self.node_log_dir = "%s/logs/server" % node_root_dir
self.heap_dump_dir = "%s/heapdump" % node_root_dir
self.all_node_ips = all_node_ips
self.all_node_names = all_node_names
self.node_ip = ip
self.http_port = http_port
self.hook_handler = hook_handler_class(self.car)
Expand Down Expand Up @@ -278,6 +280,7 @@ def variables(self):
"http_port": "%d-%d" % (self.http_port, self.http_port + 100),
"transport_port": "%d-%d" % (self.http_port + 100, self.http_port + 200),
"all_node_ips": "[\"%s\"]" % "\",\"".join(self.all_node_ips),
"all_node_names": "[\"%s\"]" % "\",\"".join(self.all_node_names),
# at the moment we are strict and enforce that all nodes are master eligible nodes
"minimum_master_nodes": len(self.all_node_ips),
"install_root_path": self.es_home_path
Expand Down
13 changes: 13 additions & 0 deletions tests/mechanic/provisioner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -81,6 +82,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-5.0.0"
}, config_vars)
Expand Down Expand Up @@ -152,6 +154,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -195,6 +198,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-5.0.0",
"plugin_name": "x-pack-security",
Expand Down Expand Up @@ -229,6 +233,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
node_name="rally-node-0",
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200)

Expand Down Expand Up @@ -272,6 +277,7 @@ def null_apply_config(source_root_path, target_root_path, config_vars):
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/opt/elasticsearch-6.3.0",
"plugin_name": "x-pack-security",
Expand Down Expand Up @@ -304,6 +310,7 @@ def test_cleanup_nothing_on_preserve(self, mock_path_exists, mock_rm):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips={"127.0.0.1"},
all_node_names=["rally-node-0"],
ip="127.0.0.1",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -324,6 +331,7 @@ def test_cleanup(self, mock_path_exists, mock_rm):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips={"127.0.0.1"},
all_node_names=["rally-node-0"],
ip="127.0.0.1",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -344,6 +352,7 @@ def test_prepare_default_data_paths(self, mock_rm, mock_ensure_dir, mock_decompr
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir=HOME_DIR + "/.rally/benchmarks/races/unittest")
Expand All @@ -362,6 +371,7 @@ def test_prepare_default_data_paths(self, mock_rm, mock_ensure_dir, mock_decompr
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/install/elasticsearch-5.0.0-SNAPSHOT"
}, installer.variables)
Expand All @@ -380,6 +390,7 @@ def test_prepare_user_provided_data_path(self, mock_rm, mock_ensure_dir, mock_de
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir="~/.rally/benchmarks/races/unittest")
Expand All @@ -398,6 +409,7 @@ def test_prepare_user_provided_data_path(self, mock_rm, mock_ensure_dir, mock_de
"http_port": "9200-9300",
"transport_port": "9300-9400",
"all_node_ips": "[\"10.17.22.22\",\"10.17.22.23\"]",
"all_node_names": "[\"rally-node-0\",\"rally-node-1\"]",
"minimum_master_nodes": 2,
"install_root_path": "/install/elasticsearch-5.0.0-SNAPSHOT"
}, installer.variables)
Expand All @@ -412,6 +424,7 @@ def test_invokes_hook(self):
java_home="/usr/local/javas/java8",
node_name="rally-node-0",
all_node_ips=["10.17.22.22", "10.17.22.23"],
all_node_names=["rally-node-0", "rally-node-1"],
ip="10.17.22.23",
http_port=9200,
node_root_dir="~/.rally/benchmarks/races/unittest",
Expand Down