Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force ZMQ bind-connect logic #165

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions src/fprime_gds/common/zmq_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

@author lestarch
"""

import logging
import struct
from typing import Tuple
Expand All @@ -24,6 +25,7 @@

LOGGER = logging.getLogger("transport")


class ZmqWrapper(object):
"""Handler for ZMQ functions for use in other objects"""

Expand All @@ -50,7 +52,9 @@ def configure(self, transport_url: Tuple[str], sub_topic: bytes, pub_topic: byte
sub_topic: subscription topic used to filter incoming messages
pub_topic: publication topic supplied for remote subscription filters
"""
assert len(transport_url) == 2, f"Must supply a pair of URLs for ZeroMQ not '{transport_url}'"
assert (
len(transport_url) == 2
), f"Must supply a pair of URLs for ZeroMQ not '{transport_url}'"
self.pub_topic = pub_topic
self.sub_topic = sub_topic
self.transport_url = transport_url
Expand Down Expand Up @@ -78,8 +82,12 @@ def connect_outgoing(self):
The connection is made using self.transport_url, and as such, this must be configured before running. This is
intended to be called on the sending thread.
"""
assert self.transport_url is not None and len(self.transport_url) == 2, "Must configure before connecting"
assert self.zmq_socket_outgoing is None, "Cannot connect outgoing multiple times"
assert (
self.transport_url is not None and len(self.transport_url) == 2
), "Must configure before connecting"
assert (
self.zmq_socket_outgoing is None
), "Cannot connect outgoing multiple times"
assert self.pub_topic is not None, "Must configure sockets before connecting"
self.zmq_socket_outgoing = self.context.socket(zmq.PUB)
self.zmq_socket_outgoing.setsockopt(zmq.SNDHWM, 0)
Expand All @@ -93,7 +101,7 @@ def connect_outgoing(self):
self.zmq_socket_outgoing.connect(self.transport_url[0])

def connect_incoming(self):
""" Sets up a ZeroMQ connection for incoming data
"""Sets up a ZeroMQ connection for incoming data

ZeroMQ allows multiple connections to a single endpoint. This only affects incoming connections as sockets must
be created on their owning threads. This will connect the ZeroMQ topology and if self.server is set, will bind
Expand All @@ -102,8 +110,12 @@ def connect_incoming(self):
The connection is made using self.transport_url, and as such, this must be configured before running. This is
intended to be called on the receiving thread.
"""
assert self.transport_url is not None and len(self.transport_url) == 2, "Must configure before connecting"
assert self.zmq_socket_incoming is None, "Cannot connect incoming multiple times"
assert (
self.transport_url is not None and len(self.transport_url) == 2
), "Must configure before connecting"
assert (
self.zmq_socket_incoming is None
), "Cannot connect incoming multiple times"
assert self.sub_topic is not None, "Must configure sockets before connecting"
self.zmq_socket_incoming = self.context.socket(zmq.SUB)
self.zmq_socket_incoming.setsockopt(zmq.RCVHWM, 0)
Expand All @@ -125,7 +137,7 @@ def disconnect_incoming(self):
self.zmq_socket_incoming.close()

def terminate(self):
""" Terminate the ZeroMQ context"""
"""Terminate the ZeroMQ context"""
self.context.term()

def recv(self, timeout=None):
Expand Down Expand Up @@ -162,11 +174,14 @@ def __init__(self):
self.zmq = ZmqWrapper()

def connect(
self, transport_url: Tuple[str], sub_routing: RoutingTag, pub_routing: RoutingTag
self,
transport_url: Tuple[str],
sub_routing: RoutingTag,
pub_routing: RoutingTag,
):
"""Connects to the ZeroMQ network"""
self.zmq.configure(transport_url, sub_routing.value, pub_routing.value)
self.zmq.connect_outgoing() # Outgoing socket, for clients, exists on the current thread
self.zmq.connect_outgoing() # Outgoing socket, for clients, exists on the current thread
super().connect(transport_url, sub_routing, pub_routing)

def disconnect(self):
Expand All @@ -176,16 +191,18 @@ def disconnect(self):

def send(self, data):
"""Send data via ZeroMQ"""
if data[:4] == b'ZZZZ':
if data[:4] == b"ZZZZ":
data = data[4:]
self.zmq.send(data) # Must strip out ZZZZ as that is a ThreadedTcpServer only property
self.zmq.send(
data
) # Must strip out ZZZZ as that is a ThreadedTcpServer only property

def recv(self, timeout=None):
"""Receives data from ZeroMQ"""
return self.zmq.recv(timeout)

def recv_thread(self):
""" Overrides the recv_thread method
"""Overrides the recv_thread method

Overrides the recv_thread method of the superclass such that the ZeroMQ socket may be created/destroyed
before/after the main recv loop.
Expand All @@ -203,11 +220,11 @@ class ZmqGround(GroundHandler):
to the display and processing layer(s). This effectively acts as the "FSW" side of that interface as it
frames/deframes packets heading to that layer.

Since there is likely only one communications client to the FSW users should call make_server() after construction
Since there is likely only one communications client to the FSW users should instantiate with server=True
to ensure that it binds to resources for the network. This is not forced in case of multiple FSW connections.
"""

def __init__(self, transport_url):
def __init__(self, transport_url, server=True):
"""Initialize this interface with the transport_url needed to connect

Args:
Expand All @@ -217,6 +234,8 @@ def __init__(self, transport_url):
self.zmq = ZmqWrapper()
self.transport_url = transport_url
self.timeout = 10
if server:
self.zmq.make_server()

def open(self):
"""Open this ground interface. Delegates to the connect method
Expand All @@ -242,10 +261,6 @@ def close(self):
self.zmq.disconnect_outgoing()
self.zmq.terminate()

def make_server(self):
"""Makes it into a server"""
self.zmq.make_server()

def receive_all(self):
"""Receive all available packets

Expand Down
87 changes: 52 additions & 35 deletions src/fprime_gds/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

@author mstarch
"""

import argparse
import datetime
import errno
Expand Down Expand Up @@ -85,8 +86,9 @@ def get_parser(self) -> argparse.ArgumentParser:
argparse parser for supplied arguments
"""
parser = argparse.ArgumentParser(
description=self.description, add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
description=self.description,
add_help=True,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
for flags, keywords in self.get_arguments().items():
parser.add_argument(*flags, **keywords)
Expand All @@ -98,7 +100,7 @@ def reproduce_cli_args(self, args_ns):
def flag_member(flags, argparse_inputs) -> Tuple[str, str]:
"""Get the best CLI flag and namespace member"""
best_flag = (
[flag for flag in flags if flag.startswith("--")] + list(flags)
[flag for flag in flags if flag.startswith("--")] + list(flags)
)[0]
member = argparse_inputs.get(
"dest", re.sub(r"^-+", "", best_flag).replace("-", "_")
Expand All @@ -119,7 +121,7 @@ def cli_arguments(flags, argparse_inputs) -> List[str]:

# Handle arguments
if (action == "store_true" and value) or (
action == "store_false" and not value
action == "store_false" and not value
):
return [best_flag]
elif action != "store" or value is None:
Expand Down Expand Up @@ -150,10 +152,10 @@ def handle_arguments(self, args, **kwargs):

@staticmethod
def parse_args(
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
parser_classes,
description="No tool description provided",
arguments=None,
**kwargs,
):
"""Parse and post-process arguments

Expand Down Expand Up @@ -236,7 +238,7 @@ def handle_arguments(self, args, **kwargs):
raise Exception(msg)
# Works for the old structure where the bin, lib, and dict directories live immediately under the platform
elif len(child_directories) == 3 and set(
[path.name for path in child_directories]
[path.name for path in child_directories]
) == {"bin", "lib", "dict"}:
args.deployment = detected_toolchain
return args
Expand All @@ -248,39 +250,46 @@ def handle_arguments(self, args, **kwargs):


class PluginArgumentParser(ParserBase):
""" Parser for arguments coming from plugins """
"""Parser for arguments coming from plugins"""

DESCRIPTION = "Parse plugin CLI arguments and selections"
FPRIME_CHOICES = {
"framing": "fprime",
"communication": "ip",
}

def __init__(self):
""" Initialize the plugin information for this parser """
"""Initialize the plugin information for this parser"""
self._plugin_map = {
category: {
self.get_selection_name(selection): selection for selection in Plugins.system().get_selections(category)
} for category in Plugins.system().get_categories()
self.get_selection_name(selection): selection
for selection in Plugins.system().get_selections(category)
}
for category in Plugins.system().get_categories()
}

def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Return arguments to used in plugins """
"""Return arguments to used in plugins"""

arguments: Dict[Tuple[str, ...], Dict[str, Any]] = {}
for category, selections in self._plugin_map.items():
arguments.update({
(f"--{category}-selection",): {
"choices": [choice for choice in selections.keys()],
"help": f"Select {category} implementer.",
"default": self.FPRIME_CHOICES.get(category, list(selections.keys())[0])
arguments.update(
{
(f"--{category}-selection",): {
"choices": [choice for choice in selections.keys()],
"help": f"Select {category} implementer.",
"default": self.FPRIME_CHOICES.get(
category, list(selections.keys())[0]
),
}
}
})
)
for selection_name, selection in selections.items():
arguments.update(self.get_selection_arguments(selection))
return arguments

def handle_arguments(self, args, **kwargs):
""" Handles the arguments """
"""Handles the arguments"""
for category, selections in self._plugin_map.items():
selection_string = getattr(args, f"{category}_selection")
selection_class = selections[selection_string]
Expand All @@ -291,24 +300,33 @@ def handle_arguments(self, args, **kwargs):

@staticmethod
def get_selection_name(selection):
""" Get the name of a selection """
return selection.get_name() if hasattr(selection, "get_name") else selection.__name__
"""Get the name of a selection"""
return (
selection.get_name()
if hasattr(selection, "get_name")
else selection.__name__
)

@staticmethod
def get_selection_arguments(selection) -> Dict[Tuple[str, ...], Dict[str, Any]]:
""" Get the name of a selection """
"""Get the name of a selection"""
return selection.get_arguments() if hasattr(selection, "get_arguments") else {}

@staticmethod
def map_selection_arguments(args, selection) -> Dict[str, Any]:
""" Get the name of a selection """
"""Get the name of a selection"""
expected_args = PluginArgumentParser.get_selection_arguments(selection)
argument_destinations = [
value["dest"] if "dest" in value else key[0].replace("--", "").replace("-", "_")
(
value["dest"]
if "dest" in value
else key[0].replace("--", "").replace("-", "_")
)
for key, value in expected_args.items()
]
filled_arguments = {
destination: getattr(args, destination) for destination in argument_destinations
destination: getattr(args, destination)
for destination in argument_destinations
}
# Check arguments or yield a Value error
if hasattr(selection, "check_arguments"):
Expand Down Expand Up @@ -358,7 +376,7 @@ def handle_arguments(self, args, **kwargs):


class CommExtraParser(ParserBase):
""" Parses extra communication arguments """
"""Parses extra communication arguments"""

DESCRIPTION = "Process arguments needed to specify arguments for communication"

Expand Down Expand Up @@ -465,12 +483,6 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
"help": "Switch to using the ZMQ transportation layer",
"default": False,
},
("--zmq-server",): {
"dest": "zmq_server",
"action": "store_true",
"help": "Sets the ZMQ connection to be a server. Default: false (client)",
"default": False,
},
("--zmq-transport",): {
"dest": "zmq_transport",
"nargs": 2,
Expand Down Expand Up @@ -646,7 +658,12 @@ def pipeline_factory(args_ns, pipeline=None) -> StandardPipeline:
class CommParser(CompositeParser):
"""Comm Executable Parser"""

CONSTITUENTS = [CommExtraParser, MiddleWareParser, LogDeployParser, PluginArgumentParser]
CONSTITUENTS = [
CommExtraParser,
MiddleWareParser,
LogDeployParser,
PluginArgumentParser,
]

def __init__(self):
"""Initialization"""
Expand Down
11 changes: 5 additions & 6 deletions src/fprime_gds/executables/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
@author lestarch
"""


import logging
import signal
import sys
Expand Down Expand Up @@ -62,7 +61,10 @@ def main():
client=True,
)
if args.communication_selection == "none":
print("[ERROR] Comm adapter set to 'none'. Nothing to do but exit.", file=sys.stderr)
print(
"[ERROR] Comm adapter set to 'none'. Nothing to do but exit.",
file=sys.stderr,
)
sys.exit(-1)

# Create the handling components for either side of this script, adapter for hardware, and ground for the GDS side
Expand All @@ -71,9 +73,6 @@ def main():
sys.exit(-1)
elif args.zmq:
ground = fprime_gds.common.zmq_transport.ZmqGround(args.zmq_transport)
# Check for need to make this a server
if args.zmq_server:
ground.make_server()
else:
ground = fprime_gds.common.communication.ground.TCPGround(
args.tts_addr, args.tts_port
Expand All @@ -87,7 +86,7 @@ def main():
LOGGER.info(
"Starting uplinker/downlinker connecting to FSW using %s with %s",
args.communication_selection,
args.framing_selection
args.framing_selection,
)
discarded_file_handle = None
try:
Expand Down
Loading
Loading