Skip to content

Commit

Permalink
Only the cluster aes key is on disk
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz committed Aug 21, 2023
1 parent 6f815d7 commit 35e3482
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 179 deletions.
62 changes: 1 addition & 61 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def crypted_transfer_decode_dictentry(
tries,
timeout,
)
log.error("WTF %r", ret)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
else:
Expand Down Expand Up @@ -670,64 +671,3 @@ def factory(opts, **kwargs):
import salt.transport.ipc

return salt.transport.ipc.IPCMessageServer(opts, **kwargs)


class AsyncMasterPubChannel:
""" """

async_methods = [
"connect",
]

close_methods = [
"close",
]

@classmethod
def factory(cls, opts, **kwargs):
io_loop = kwargs.get("io_loop")
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()
transport = salt.transport.ipc_publish_client(opts, "master")
return cls(opts, transport, None, io_loop)

def __init__(self, opts, transport, auth, io_loop=None):
self.opts = opts
self.io_loop = io_loop
self.auth = auth
self.transport = transport
self._closing = False
self._reconnected = False

async def connect(self):
"""
Return a future which completes when connected to the remote publisher
"""
await self.transport.connect()

async def recv(self, timeout=None):
return await self.transport.recv(timeout)

def close(self):
"""
Close the channel
"""
self.transport.close()

def on_recv(self, callback=None):
"""
When jobs are received pass them (decoded) to callback
"""
return self.transport.on_recv(callback)

def __enter__(self):
return self

def __exit__(self, *args):
self.io_loop.spawn_callback(self.close)

async def __aenter__(self):
return self

async def __aexit__(self, *_):
await self.close()
122 changes: 93 additions & 29 deletions salt/channel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import asyncio
import binascii
import collections
import hashlib
import logging
import os
Expand Down Expand Up @@ -144,7 +145,7 @@ def handle_message(self, payload):
raise tornado.gen.Return("bad load: id contains a null byte")
except TypeError:
log.error("Payload contains non-string id: %s", payload)
raise tornado.gen.Return("bad load: id {} is not a string".format(id_))
raise tornado.gen.Return(f"bad load: id {id_} is not a string")

version = 0
if "version" in payload:
Expand Down Expand Up @@ -244,12 +245,16 @@ def _update_aes(self):
"""
import salt.master

key = "aes"
if self.opts.get("cluster_id", None):
key = "cluster_aes"

if (
salt.master.SMaster.secrets["aes"]["secret"].value
salt.master.SMaster.secrets[key]["secret"].value
!= self.crypticle.key_string
):
self.crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
self.opts, salt.master.SMaster.secrets[key]["secret"].value
)
return True
return False
Expand Down Expand Up @@ -648,7 +653,7 @@ def _auth(self, load, sign_messages=False):
)
else:
mtoken = mcipher.decrypt(load["token"])
aes = "{}_|-{}".format(self.aes_key, mtoken)
aes = f"{self.aes_key}_|-{mtoken}"
except Exception: # pylint: disable=broad-except
# Token failed to decrypt, send back the salty bacon to
# support older minions
Expand Down Expand Up @@ -999,20 +1004,23 @@ def _publish_daemon(self, **kwargs):
self.io_loop = tornado.ioloop.IOLoop.current()
tcp_master_pool_port = 4520
self.pushers = []
for master in self.opts.get("cluster_peers", []):
self.auth_errors = {}
for peer in self.opts.get("cluster_peers", []):
pusher = salt.transport.tcp.TCPPublishServer(
self.opts,
pull_host=master,
pull_host=peer,
pull_port=tcp_master_pool_port,
)
self.auth_errors[peer] = collections.deque()
self.pushers.append(pusher)
self.pool_puller = salt.transport.tcp.TCPPuller(
host=self.opts["interface"],
port=tcp_master_pool_port,
io_loop=self.io_loop,
payload_handler=self.handle_pool_publish,
)
self.pool_puller.start()
if self.opts.get("cluster_id", None):
self.pool_puller = salt.transport.tcp.TCPPuller(
host=self.opts["interface"],
port=tcp_master_pool_port,
io_loop=self.io_loop,
payload_handler=self.handle_pool_publish,
)
self.pool_puller.start()
self.io_loop.add_callback(
self.transport.publisher,
self.publish_payload,
Expand Down Expand Up @@ -1061,20 +1069,46 @@ async def handle_pool_publish(self, payload, _):
if self.peer_keys[peer] != key_str:
self.peer_keys[peer] = key_str
self.send_aes_key_event()
while self.auth_errors[peer]:
key, data = self.auth_errors[peer].popleft()
peer_id, parsed_tag = self.parse_cluster_tag(tag)
try:
event_data = self.extract_cluster_event(peer_id, data)
except salt.exceptions.AuthenticationError:
log.error(
"Event from peer failed authentication: %s", peer_id
)
else:
await self.transport.publish_payload(
salt.utils.event.SaltEvent.pack(
parsed_tag, event_data
)
)
else:
self.peer_keys[peer] = key_str
self.send_aes_key_event()
while self.auth_errors[peer]:
key, data = self.auth_errors[peer].popleft()
peer_id, parsed_tag = self.parse_cluster_tag(tag)
try:
event_data = self.extract_cluster_event(peer_id, data)
except salt.exceptions.AuthenticationError:
log.error(
"Event from peer failed authentication: %s", peer_id
)
else:
await self.transport.publish_payload(
salt.utils.event.SaltEvent.pack(parsed_tag, event_data)
)
elif tag.startswith("cluster/event"):
peer_id = tag.replace("cluster/event/", "").split("/")[0]
stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "")
if peer_id in self.peer_keys:
crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id])
event_data = crypticle.loads(data)
# __peer_id can be used to know if this event came from a
# different master?
event_data["__peer_id"] = peer_id
peer_id, parsed_tag = self.parse_cluster_tag(tag)
try:
event_data = self.extract_cluster_event(peer_id, data)
except salt.exceptions.AuthenticationError:
self.auth_errors[peer_id].append((tag, data))
else:
await self.transport.publish_payload(
salt.utils.event.SaltEvent.pack(stripped_tag, event_data)
salt.utils.event.SaltEvent.pack(parsed_tag, event_data)
)
else:
log.error("This cluster tag not valid %s", tag)
Expand All @@ -1083,18 +1117,37 @@ async def handle_pool_publish(self, payload, _):
log.critical("Unexpected error while polling master events", exc_info=True)
return None

def parse_cluster_tag(self, tag):
peer_id = tag.replace("cluster/event/", "").split("/")[0]
stripped_tag = tag.replace(f"cluster/event/{peer_id}/", "")
return peer_id, stripped_tag

def extract_cluster_event(self, peer_id, data):
if peer_id in self.peer_keys:
crypticle = salt.crypt.Crypticle(self.opts, self.peer_keys[peer_id])
event_data = crypticle.loads(data)["event_payload"]
# __peer_id can be used to know if this event came from a
# different master?
event_data["__peer_id"] = peer_id
return event_data
raise salt.exceptions.AuthenticationError("Peer aes key not available")

async def publish_payload(self, load, *args):
log.error("Publish event to local ipc clients")
tag, data = salt.utils.event.SaltEvent.unpack(load)
tasks = []
if not tag.startswith("cluster/peer"):
tasks = [asyncio.create_task(self.transport.publish_payload(load))]
tasks = [
asyncio.create_task(
self.transport.publish_payload(load), name=self.opts["id"]
)
]
for pusher in self.pushers:
log.error(
"Publish event to peer master %s:%s", pusher.pull_host, pusher.pull_port
)
log.debug("Publish event to peer %s:%s", pusher.pull_host, pusher.pull_port)
if tag.startswith("cluster/peer"):
tasks.append(asyncio.create_task(pusher.publish(load)))
tasks.append(
asyncio.create_task(pusher.publish(load), name=pusher.pull_host)
)
continue
crypticle = salt.crypt.Crypticle(
self.opts, salt.master.SMaster.secrets["aes"]["secret"].value
Expand All @@ -1109,5 +1162,16 @@ async def publish_payload(self, load, *args):
for task in tasks:
try:
task.result()
except Exception: # pylint: disable=broad-except
log.error("Error sending task %s", exc)
# XXX This error is transport specific and should be something else
except tornado.iostream.StreamClosedError:
if task.get_name() == self.opts["id"]:
log.error("Unable to forward event to local ipc bus")
else:
log.warning(
"Unable to forward event to cluster peer %s", task.get_name()
)
continue
except Exception as exc: # pylint: disable=broad-except
log.error(
"Unhandled error sending task %s", task.get_name(), exc_info=True
)
Loading

0 comments on commit 35e3482

Please sign in to comment.