From 1679b5010738cf2feb33dc5984c9e436f65ade01 Mon Sep 17 00:00:00 2001 From: "Daniel A. Wozniak" Date: Thu, 24 Aug 2023 15:38:42 -0700 Subject: [PATCH] Only return job results to originating master. --- changelog/62834.fixed.md | 1 + salt/minion.py | 32 ++++-- .../integration/minion/test_job_return.py | 107 ++++++++++++++++++ 3 files changed, 130 insertions(+), 10 deletions(-) create mode 100644 changelog/62834.fixed.md create mode 100644 tests/pytests/integration/minion/test_job_return.py diff --git a/changelog/62834.fixed.md b/changelog/62834.fixed.md new file mode 100644 index 000000000000..c3fa1e5c03e9 --- /dev/null +++ b/changelog/62834.fixed.md @@ -0,0 +1 @@ +Job returns are only sent to originating master diff --git a/salt/minion.py b/salt/minion.py index 2ee1387c6ec0..4225022042b5 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -1607,7 +1607,9 @@ def _send_req_sync(self, load, timeout): "minion", opts=self.opts, listen=False ) as event: return event.fire_event( - load, "__master_req_channel_payload", timeout=timeout + load, + f"__master_req_channel_payload/{self.opts['master']}", + timeout=timeout, ) @salt.ext.tornado.gen.coroutine @@ -1624,7 +1626,9 @@ def _send_req_async(self, load, timeout): "minion", opts=self.opts, listen=False ) as event: ret = yield event.fire_event_async( - load, "__master_req_channel_payload", timeout=timeout + load, + f"__master_req_channel_payload/{self.opts['master']}", + timeout=timeout, ) raise salt.ext.tornado.gen.Return(ret) @@ -2709,14 +2713,22 @@ def handle_event(self, package): notify=data.get("notify", False), ) elif tag.startswith("__master_req_channel_payload"): - try: - yield _minion.req_channel.send( - data, - timeout=_minion._return_retry_timer(), - tries=_minion.opts["return_retry_tries"], + job_master = tag.rsplit("/", 1)[1] + if job_master == self.opts["master"]: + try: + yield _minion.req_channel.send( + data, + timeout=_minion._return_retry_timer(), + tries=_minion.opts["return_retry_tries"], + ) + except salt.exceptions.SaltReqTimeoutError: + log.error("Timeout encountered while sending %r request", data) + else: + log.debug( + "Skipping job return for other master: jid=%s master=%s", + data["jid"], + job_master, ) - except salt.exceptions.SaltReqTimeoutError: - log.error("Timeout encountered while sending %r request", data) elif tag.startswith("pillar_refresh"): yield _minion.pillar_refresh( force_refresh=data.get("force_refresh", False), @@ -3320,7 +3332,7 @@ def timeout_handler(*args): data["to"], io_loop=self.io_loop, callback=lambda _: None, - **kwargs + **kwargs, ) def _send_req_sync(self, load, timeout): diff --git a/tests/pytests/integration/minion/test_job_return.py b/tests/pytests/integration/minion/test_job_return.py new file mode 100644 index 000000000000..19f25e8baa20 --- /dev/null +++ b/tests/pytests/integration/minion/test_job_return.py @@ -0,0 +1,107 @@ +import os +import shutil +import subprocess + +import pytest + +import salt.utils.platform + + +@pytest.fixture +def salt_master_1(request, salt_factories): + config_defaults = { + "open_mode": True, + "transport": request.config.getoption("--transport"), + } + config_overrides = { + "interface": "127.0.0.1", + } + + factory = salt_factories.salt_master_daemon( + "master-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def salt_master_2(salt_factories, salt_master_1): + if salt.utils.platform.is_darwin() or salt.utils.platform.is_freebsd(): + subprocess.check_output(["ifconfig", "lo0", "alias", "127.0.0.2", "up"]) + + config_defaults = { + "open_mode": True, + "transport": salt_master_1.config["transport"], + } + config_overrides = { + "interface": "127.0.0.2", + } + + # Use the same ports for both masters, they are binding to different interfaces + for key in ( + "ret_port", + "publish_port", + ): + config_overrides[key] = salt_master_1.config[key] + factory = salt_factories.salt_master_daemon( + "master-2", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + + # The secondary salt master depends on the primarily salt master fixture + # because we need to clone the keys + for keyfile in ("master.pem", "master.pub"): + shutil.copyfile( + os.path.join(salt_master_1.config["pki_dir"], keyfile), + os.path.join(factory.config["pki_dir"], keyfile), + ) + with factory.started(start_timeout=120): + yield factory + + +@pytest.fixture +def salt_minion_1(salt_master_1, salt_master_2): + config_defaults = { + "transport": salt_master_1.config["transport"], + } + + master_1_port = salt_master_1.config["ret_port"] + master_1_addr = salt_master_1.config["interface"] + master_2_port = salt_master_2.config["ret_port"] + master_2_addr = salt_master_2.config["interface"] + config_overrides = { + "master": [ + "{}:{}".format(master_1_addr, master_1_port), + "{}:{}".format(master_2_addr, master_2_port), + ], + "test.foo": "baz", + } + factory = salt_master_1.salt_minion_daemon( + "minion-1", + defaults=config_defaults, + overrides=config_overrides, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + ) + with factory.started(start_timeout=120): + yield factory + + +def test_job_resturn(salt_master_1, salt_master_2, salt_minion_1): + cli = salt_master_1.salt_cli(timeout=120) + ret = cli.run("test.ping", "-v", minion_tgt="minion-1") + for line in ret.stdout.splitlines(): + if "with jid" in line: + jid = line.split("with jid")[1].strip() + + run_1 = salt_master_1.salt_run_cli(timeout=120) + ret = run_1.run("jobs.lookup_jid", jid) + assert ret.data == {"minion-1": True} + + run_2 = salt_master_2.salt_run_cli(timeout=120) + ret = run_2.run("jobs.lookup_jid", jid) + assert ret.data == {}