Skip to content

Commit

Permalink
Test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz authored and garethgreenaway committed Aug 8, 2023
1 parent 077c253 commit 9683260
Show file tree
Hide file tree
Showing 22 changed files with 2,428 additions and 1,752 deletions.
4 changes: 4 additions & 0 deletions salt/_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def __init__(self, stream, max_queue_size=10000):
super().__init__(stream)
self.__messages = deque(maxlen=max_queue_size)
self.__emitting = False
import traceback

self.stack = "".join(traceback.format_stack())

def handle(self, record):
self.acquire()
Expand All @@ -115,6 +118,7 @@ def flush(self):
super().handle(record)
finally:
self.__emitting = False
# This will raise a ValueError if the file handle has been closed.
super().flush()

def sync_with_handlers(self, handlers=()):
Expand Down
10 changes: 9 additions & 1 deletion salt/_logging/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,15 @@ def setup_temp_handler(log_level=None):
break
else:
handler = DeferredStreamHandler(sys.stderr)
atexit.register(handler.flush)

def tryflush():
try:
handler.flush()
except ValueError:
# File handle has already been closed.
pass

atexit.register(tryflush)
handler.setLevel(log_level)

# Set the default temporary console formatter config
Expand Down
34 changes: 25 additions & 9 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class AsyncReqChannel:
"_uncrypted_transfer",
"send",
"connect",
# "close",
]
close_methods = [
"close",
Expand Down Expand Up @@ -314,9 +315,8 @@ def _uncrypted_transfer(self, load, timeout):

raise tornado.gen.Return(ret)

@tornado.gen.coroutine
def connect(self):
yield self.transport.connect()
async def connect(self):
await self.transport.connect()

@tornado.gen.coroutine
def send(self, load, tries=None, timeout=None, raw=False):
Expand Down Expand Up @@ -367,6 +367,14 @@ def __enter__(self):
def __exit__(self, *args):
self.close()

async def __aenter__(self):
await self.connect()
return self

async def __aexit__(self, exc_type, exc, tb):
# print("AEXIT")
self.close()


class AsyncPubChannel:
"""
Expand All @@ -376,7 +384,7 @@ class AsyncPubChannel:
async_methods = [
"connect",
"_decode_messages",
# "close",
# "close",
]
close_methods = [
"close",
Expand Down Expand Up @@ -406,7 +414,9 @@ def factory(cls, opts, **kwargs):
io_loop = tornado.ioloop.IOLoop.current()

auth = salt.crypt.AsyncAuth(opts, io_loop=io_loop)
transport = salt.transport.publish_client(opts, io_loop)
host = opts.get("master_ip", "127.0.0.1")
port = int(opts.get("publish_port", 4506))
transport = salt.transport.publish_client(opts, io_loop, host=host, port=port)
return cls(opts, transport, auth, io_loop)

def __init__(self, opts, transport, auth, io_loop=None):
Expand All @@ -432,6 +442,7 @@ def connect(self):
try:
if not self.auth.authenticated:
yield self.auth.authenticate()
# log.error("*** Creds %r", self.auth.creds)
# if this is changed from the default, we assume it was intentional
if int(self.opts.get("publish_port", 4506)) != 4506:
publish_port = self.opts.get("publish_port")
Expand All @@ -447,6 +458,8 @@ def connect(self):
except KeyboardInterrupt: # pylint: disable=try-except-raise
raise
except Exception as exc: # pylint: disable=broad-except
# TODO: Basing re-try logic off exception messages is brittle and
# prone to errors; use exception types or some other method.
if "-|RETRY|-" not in str(exc):
raise salt.exceptions.SaltClientError(
f"Unable to sign_in to master: {exc}"
Expand All @@ -456,11 +469,8 @@ def close(self):
"""
Close the channel
"""
log.error("AsyncPubChannel.close called")
self.transport.close()
log.error("Transport closed")
if self.event is not None:
log.error("Event destroy called")
self.event.destroy()
self.event = None

Expand Down Expand Up @@ -616,7 +626,13 @@ def __enter__(self):
return self

def __exit__(self, *args):
self.close()
self.io_loop.spawn_callback(self.close)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
await self.close()


class AsyncPushChannel:
Expand Down
7 changes: 4 additions & 3 deletions salt/channel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,6 @@ class PubServerChannel:
Factory class to create subscription channels to the master's Publisher
"""

def __repr__(self):
return f"<PubServerChannel pub_uri={self.transport.pub_uri} pull_uri={self.transport.pull_uri} at {id(self)}>"

@classmethod
def factory(cls, opts, **kwargs):
if "master_uri" not in opts and "master_uri" in kwargs:
Expand All @@ -720,6 +717,9 @@ def factory(cls, opts, **kwargs):
transport = salt.transport.publish_server(opts, **kwargs)
return cls(opts, transport, presence_events=presence_events)

def __repr__(self):
return f"<PubServerChannel pub_uri={self.transport.pub_uri} pull_uri={self.transport.pull_uri} at {id(self)}>"

def __init__(self, opts, transport, presence_events=False):
self.opts = opts
self.ckminions = salt.utils.minions.CkMinions(self.opts)
Expand Down Expand Up @@ -774,6 +774,7 @@ def _publish_daemon(self, **kwargs):
secrets = kwargs.get("secrets", None)
if secrets is not None:
salt.master.SMaster.secrets = secrets
log.error("RUN TRANSPORT PUBD")
self.transport.publish_daemon(
self.publish_payload, self.presence_callback, self.remove_presence_callback
)
Expand Down
2 changes: 1 addition & 1 deletion salt/cli/salt.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def run(self):
AuthorizationError,
SaltInvocationError,
EauthAuthenticationError,
SaltClientError,
# SaltClientError,
) as exc:
print(repr(exc))
ret = str(exc)
Expand Down
2 changes: 1 addition & 1 deletion salt/crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ def loads(self, data, raw=False, nonce=None):
ret_nonce = data[:32].decode()
data = data[32:]
if ret_nonce != nonce:
raise SaltClientError("Nonce verification error")
raise SaltClientError(f"Nonce verification error {ret_nonce} {nonce}")
payload = salt.payload.loads(data, raw=raw)
if isinstance(payload, dict):
if "serial" in payload:
Expand Down
57 changes: 45 additions & 12 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,30 @@ def _bind(self):
# self.opts,
# io_loop=self.io_loop,
# )

# import hashlib
# ipc_publisher = salt.transport.publish_server(self.opts)
# hash_type = getattr(hashlib, self.opts["hash_type"])
# id_hash = hash_type(
# salt.utils.stringutils.to_bytes(self.opts["id"])
# ).hexdigest()[:10]
# epub_sock_path = "ipc://{}".format(
# os.path.join(
# self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
# )
# )
# if os.path.exists(epub_sock_path):
# os.unlink(epub_sock_path)
# epull_sock_path = "ipc://{}".format(
# os.path.join(
# self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
# )
# )
# ipc_publisher.pub_uri = epub_sock_path
# ipc_publisher.pull_uri = epull_sock_path
# self.io_loop.add_callback(ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop)
import hashlib

ipc_publisher = salt.transport.publish_server(self.opts)
hash_type = getattr(hashlib, self.opts["hash_type"])
id_hash = hash_type(
Expand All @@ -1068,7 +1091,18 @@ def _bind(self):
)
ipc_publisher.pub_uri = epub_sock_path
ipc_publisher.pull_uri = epull_sock_path
self.io_loop.add_callback(ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop)
if self.opts["transport"] == "tcp":

def target():
ipc_publisher.publish_daemon(ipc_publisher.publish_payload)

proc = salt.utils.process.Process(target=target, daemon=True)
proc.start()
else:
self.io_loop.add_callback(
ipc_publisher.publisher, ipc_publisher.publish_payload, self.io_loop
)
log.error("get event ")
self.event = salt.utils.event.get_event(
"minion", opts=self.opts, io_loop=self.io_loop
)
Expand Down Expand Up @@ -1139,11 +1173,8 @@ def _spawn_minions(self, timeout=60):
self.io_loop.spawn_callback(self._connect_minion, minion)
self.io_loop.call_later(timeout, self._check_minions)

@tornado.gen.coroutine
def _connect_minion(self, minion):
"""
Create a minion, and asynchronously connect it to a master
"""
async def _connect_minion(self, minion):
"""Create a minion, and asynchronously connect it to a master"""
last = 0 # never have we signed in
auth_wait = minion.opts["acceptance_wait_time"]
failed = False
Expand All @@ -1154,7 +1185,8 @@ def _connect_minion(self, minion):
if minion.opts.get("scheduler_before_connect", False):
minion.setup_scheduler(before_connect=True)
if minion.opts.get("master_type", "str") != "disable":
yield minion.connect_master(failed=failed)
await minion.connect_master(failed=failed)
log.error("RUN MINION TUNE IN")
minion.tune_in(start=False)
self.minions.append(minion)
break
Expand All @@ -1164,11 +1196,12 @@ def _connect_minion(self, minion):
"Error while bringing up minion for multi-master. Is "
"master at %s responding?",
minion.opts["master"],
exc_info=True,
)
last = time.time()
if auth_wait < self.max_auth_wait:
auth_wait += self.auth_wait
yield tornado.gen.sleep(auth_wait) # TODO: log?
await tornado.gen.sleep(auth_wait) # TODO: log?
except SaltMasterUnresolvableError:
err = (
"Master address: '{}' could not be resolved. Invalid or"
Expand Down Expand Up @@ -3269,16 +3302,16 @@ def destroy(self):
self.pub_channel.on_recv(None)
log.error("create pub_channel.close task %r", self)
self.pub_channel.close()
#self.io_loop.asyncio_loop.run_until_complete(self.pub_channel.close())
#if hasattr(self.pub_channel, "close"):
# self.io_loop.asyncio_loop.run_until_complete(self.pub_channel.close())
# if hasattr(self.pub_channel, "close"):
# asyncio.create_task(
# self.pub_channel.close()
# )
# #self.pub_channel.close()
#del self.pub_channel
# del self.pub_channel
if hasattr(self, "event"):
log.error("HAS EVENT")
#if hasattr(self, "periodic_callbacks"):
# if hasattr(self, "periodic_callbacks"):
# for cb in self.periodic_callbacks.values():
# cb.stop()
log.error("%r destroy method finished", self)
Expand Down
27 changes: 26 additions & 1 deletion salt/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def publish_server(opts, **kwargs):
raise Exception("Transport type not found: {}".format(ttype))


def publish_client(opts, io_loop):
def ipc_publish_client(opts, io_loop):
# Default to ZeroMQ for now
ttype = "zeromq"
# determine the ttype
Expand All @@ -85,6 +85,7 @@ def publish_client(opts, io_loop):
# switch on available ttypes
if ttype == "zeromq":
import salt.transport.zeromq

return salt.transport.zeromq.PublishClient(opts, io_loop)
elif ttype == "tcp":
import salt.transport.tcp
Expand All @@ -93,6 +94,30 @@ def publish_client(opts, io_loop):
raise Exception("Transport type not found: {}".format(ttype))


def publish_client(opts, io_loop, host=None, port=None, path=None):
# Default to ZeroMQ for now
ttype = "zeromq"
# determine the ttype
if "transport" in opts:
ttype = opts["transport"]
elif "transport" in opts.get("pillar", {}).get("master", {}):
ttype = opts["pillar"]["master"]["transport"]
# switch on available ttypes
if ttype == "zeromq":
import salt.transport.zeromq

return salt.transport.zeromq.PublishClient(
opts, io_loop, host=host, port=port, path=path
)
elif ttype == "tcp":
import salt.transport.tcp

return salt.transport.tcp.TCPPubClient(
opts, io_loop, host=host, port=port, path=path
)
raise Exception("Transport type not found: {}".format(ttype))


class RequestClient:
"""
The RequestClient transport is used to make requests and get corresponding
Expand Down
23 changes: 11 additions & 12 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,10 @@ def start(self):
else:
self.sock = tornado.netutil.bind_unix_socket(self.socket_path)

with salt.utils.asynchronous.current_ioloop(self.io_loop):
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
tornado.netutil.add_accept_handler(
self.sock,
self.handle_connection,
)
self._started = True

@tornado.gen.coroutine
Expand Down Expand Up @@ -208,7 +207,7 @@ def return_message(msg):
log.error("Exception occurred while handling stream: %s", exc)

def handle_connection(self, connection, address):
log.trace(
log.error(
"IPCServer: Handling connection to address: %s",
address if address else connection,
)
Expand Down Expand Up @@ -338,8 +337,8 @@ def _connect(self, timeout=None):
break

if self.stream is None:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
self.stream = IOStream(socket.socket(sock_type, socket.SOCK_STREAM))
# with salt.utils.asynchronous.current_ioloop(self.io_loop):
self.stream = IOStream(socket.socket(sock_type, socket.SOCK_STREAM))
try:
log.trace("IPCClient: Connecting to socket: %s", self.socket_path)
yield self.stream.connect(sock_addr)
Expand Down Expand Up @@ -440,8 +439,8 @@ class IPCMessageClient(IPCClient):

# FIXME timeout unimplemented
# FIXME tries unimplemented
@tornado.gen.coroutine
def send(self, msg, timeout=None, tries=None):
# @tornado.gen.coroutine
async def send(self, msg, timeout=None, tries=None):
"""
Send a message to an IPC socket
Expand All @@ -451,9 +450,9 @@ def send(self, msg, timeout=None, tries=None):
:param int timeout: Timeout when sending message (Currently unimplemented)
"""
if not self.connected():
yield self.connect()
await self.connect()
pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True)
yield self.stream.write(pack)
await self.stream.write(pack)


class IPCMessageServer(IPCServer):
Expand Down
Loading

0 comments on commit 9683260

Please sign in to comment.