Skip to content

Commit

Permalink
Deltaproxy does not work with subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz committed Jan 10, 2022
1 parent fae15bf commit f5c25b4
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 38 deletions.
4 changes: 2 additions & 2 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ def send(self, load, tries=3, timeout=60, raw=False):
"""
try:
if self.crypt == "clear":
log.info("ReqChannel send clear load=%r", load)
log.trace("ReqChannel send clear load=%r", load)
ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)
else:
log.info("ReqChannel send crypt load=%r", load)
log.trace("ReqChannel send crypt load=%r", load)
ret = yield self._crypted_transfer(
load, tries=tries, timeout=timeout, raw=raw
)
Expand Down
15 changes: 0 additions & 15 deletions salt/channel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,21 +814,6 @@ def wrap_payload(self, load):
else:
int_payload["topic_lst"] = load["tgt"]

## From TCP transport
# if load["tgt_type"] == "list" and not self.opts.get("order_masters", False):
# if isinstance(load["tgt"], str):
# # Fetch a list of minions that match
# _res = self.ckminions.check_minions(
# load["tgt"], tgt_type=load["tgt_type"]
# )
# match_ids = _res["minions"]

# log.debug("Publish Side Match: %s", match_ids)
# # Send list of miions thru so zmq can target them
# int_payload["topic_lst"] = match_ids
# else:
# int_payload["topic_lst"] = load["tgt"]

return int_payload

def publish(self, load):
Expand Down
22 changes: 1 addition & 21 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,19 +601,6 @@ def close(self):
return
self._closing = True
self.io_loop.add_timeout(1, self.check_close)
return

# try:
# for msg_id in list(self.send_future_map):
# log.error("Closing before send future completed %r", msg_id)
# future = self.send_future_map.pop(msg_id)
# future.set_exception(ClosingError())
# self._tcp_client.close()
# # self._stream.close()
# finally:
# self._stream = None
# self._closing = False
# self._closed = True

@salt.ext.tornado.gen.coroutine
def check_close(self):
Expand Down Expand Up @@ -1306,7 +1293,7 @@ def handle_stream(self, stream, address):
# TODO: ACK the publish through IPC
@salt.ext.tornado.gen.coroutine
def publish_payload(self, package, topic_list=None):
log.error("TCP PubServer sending payload: %s \n\n %r", package, topic_list)
log.trace("TCP PubServer sending payload: %s \n\n %r", package, topic_list)
payload = salt.transport.frame.frame_msg(package)
to_remove = []
if topic_list:
Expand Down Expand Up @@ -1379,10 +1366,8 @@ def publish_daemon(
log_queue_level = kwargs.get("log_queue_level")
if log_queue_level is not None:
salt.log.setup.set_multiprocessing_logging_level(log_queue_level)
log.error("PUB D - a")
io_loop = salt.ext.tornado.ioloop.IOLoop()
io_loop.make_current()
log.error("PUB D - b")

# Spin up the publisher
self.pub_server = pub_server = PubServer(
Expand All @@ -1391,7 +1376,6 @@ def publish_daemon(
presence_callback=presence_callback,
remove_presence_callback=remove_presence_callback,
)
log.error("PUB D - c")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
_set_tcp_keepalive(sock, self.opts)
Expand All @@ -1407,7 +1391,6 @@ def publish_daemon(
else:
pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
self.pub_server = pub_server
log.error("PUB D - d")
pull_sock = salt.transport.ipc.IPCMessageServer(
pull_uri,
io_loop=io_loop,
Expand All @@ -1421,13 +1404,11 @@ def publish_daemon(

# run forever
try:
log.error("PUB D - e")
io_loop.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
pull_sock.close()
log.error("PUB D - f")

def pre_fork(self, process_manager, kwargs=None):
"""
Expand All @@ -1441,7 +1422,6 @@ def pre_fork(self, process_manager, kwargs=None):

@salt.ext.tornado.gen.coroutine
def publish_payload(self, payload, *args):
log.error("PUB CHAN publish_payload")
ret = yield self.pub_server.publish_payload(payload, *args)
raise salt.ext.tornado.gen.Return(ret)

Expand Down
Empty file.
6 changes: 6 additions & 0 deletions tests/pytests/integration/proxy/test_deltaproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
log = logging.getLogger(__name__)


@pytest.fixture(scope="package", autouse=True)
def skip_on_tcp_transport(request):
if request.config.getoption("--transport") == "tcp":
pytest.skip("Deltaproxy under the TPC transport is not working. See #61367")


@pytest.fixture(scope="module", autouse=True)
def salt_delta_proxy(salt_delta_proxy):
"""
Expand Down

0 comments on commit f5c25b4

Please sign in to comment.