Skip to content

Commit

Permalink
Output unframed (garbage) data to file (#151)
Browse files Browse the repository at this point in the history
* Allowing output of unframed data

* black formatting

* Calling out error in log pane as error box
  • Loading branch information
LeStarch authored Nov 27, 2023
1 parent b662e64 commit 54532c8
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 69 deletions.
31 changes: 18 additions & 13 deletions src/fprime_gds/common/communication/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the specified format. Produces exactly one packet, and leftover bytes. Users
wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, and the leftover
bytes that were unused. Will search and discard data up until a start token is found. Note: data will be
consumed up to the first start token found.
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, the leftover
bytes that were unused, and any bytes discarded from the existing data stream. Will search and discard data up
until a start token is found. Note: data will be consumed up to the first start token found.
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
:return: (packet as array of bytes or None, leftover bytes, any discarded data)
"""

def deframe_all(self, data, no_copy):
Expand All @@ -56,16 +56,18 @@ def deframe_all(self, data, no_copy):
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return:
:return: list of packets, remaining data, discarded/unframed/garbage data
"""
packets = []
if not no_copy:
data = copy.copy(data)
discarded_aggregate = b""
while True:
# Deframe and return only on None
(packet, data) = self.deframe(data, no_copy=True)
(packet, data, discarded) = self.deframe(data, no_copy=True)
discarded_aggregate += discarded
if packet is None:
return packets, data
return packets, data, discarded_aggregate
packets.append(packet)


Expand Down Expand Up @@ -147,6 +149,7 @@ def deframe(self, data, no_copy=False):
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
"""
discarded = b""
if not no_copy:
data = copy.copy(data)
# Continue until there is not enough data for the header, or until a packet is found (return)
Expand All @@ -163,6 +166,7 @@ def deframe(self, data, no_copy=False):
start != FpFramerDeframer.START_TOKEN
or data_size >= FpFramerDeframer.MAXIMUM_DATA_SIZE
):
discarded += data[0:1]
data = data[1:]
continue
# If the pool is large enough to read the whole frame, then read it
Expand All @@ -175,17 +179,18 @@ def deframe(self, data, no_copy=False):
data[: data_size + FpFramerDeframer.HEADER_SIZE]
):
data = data[total_size:]
return deframed, data
return deframed, data, discarded
print(
"[WARNING] Checksum validation failed. Have you correctly set '--comm-checksum-type'",
file=sys.stderr,
)
# Bad checksum, rotate 1 and keep looking for non-garbage
discarded += data[0:1]
data = data[1:]
continue
# Case of not enough data for a full packet, return hoping for more later
return None, data
return None, data
return None, data, discarded
return None, data, discarded


class TcpServerFramerDeframer(FramerDeframer):
Expand Down Expand Up @@ -237,11 +242,11 @@ def deframe(self, data, no_copy=False):
data = data[1:]
# Break out of data when not enough
if len(data) < 8:
return None, data
return None, data, b""
# Read the length and break if not enough data
(data_len,) = struct.unpack_from(">I", data, 4)
if len(data) < data_len + 8:
return None, data
return None, data, b""
packet = data[8 : data_len + 8]
data = data[data_len + 8 :]
return packet, data
return packet, data, b""
2 changes: 1 addition & 1 deletion src/fprime_gds/common/communication/ground.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def receive_all(self):
:return: list deframed packets
"""
self.data += self.tcp.read()
(frames, self.data) = self.deframer.deframe_all(self.data, no_copy=True)
(frames, self.data, _) = self.deframer.deframe_all(self.data, no_copy=True)
return frames

def send_all(self, frames):
Expand Down
31 changes: 26 additions & 5 deletions src/fprime_gds/common/communication/updown.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ class Downlinker:
"""

def __init__(
self, adapter: BaseAdapter, ground: GroundHandler, deframer: FramerDeframer
self,
adapter: BaseAdapter,
ground: GroundHandler,
deframer: FramerDeframer,
discarded=None,
):
"""Initialize the downlinker
Constructs a new downlinker object used to run the downlink and deframing operation.
Constructs a new downlinker object used to run the downlink and deframing operation. This downlinker will log
discarded (unframed) data when discarded is a writable data object. When discarded is None the discarded data is
dropped.
Args:
adapter: adapter used to read raw data from the hardware connection
ground: handles the ground side connection
deframer: deframer used to deframe data from the communication format
discarded: file to write discarded data to. None to drop the data.
"""
self.running = True
self.th_ground = None
Expand All @@ -54,13 +61,18 @@ def __init__(
self.ground = ground
self.deframer = deframer
self.outgoing = Queue()
self.discarded = discarded

def start(self):
"""Starts the downlink pipeline"""
self.th_ground = threading.Thread(target=self.sending, name="DownlinkTTSGroundThread")
self.th_ground = threading.Thread(
target=self.sending, name="DownlinkTTSGroundThread"
)
self.th_ground.daemon = True
self.th_ground.start()
self.th_data = threading.Thread(target=self.deframing, name="DownLinkDeframingThread")
self.th_data = threading.Thread(
target=self.deframing, name="DownLinkDeframingThread"
)
self.th_data.daemon = True
self.th_data.start()

Expand All @@ -74,12 +86,20 @@ def deframing(self):
while self.running:
# Blocks until data is available, but may still return b"" if timeout
pool += self.adapter.read()
frames, pool = self.deframer.deframe_all(pool, no_copy=True)
frames, pool, discarded_data = self.deframer.deframe_all(pool, no_copy=True)
try:
for frame in frames:
self.outgoing.put_nowait(frame)
except Full:
DW_LOGGER.warning("GDS ground queue full, dropping frame")
try:
if self.discarded is not None:
self.discarded.write(discarded_data)
self.discarded.flush()
# Failure to write discarded data should never stop the GDS. Log it and move on.
except Exception as exc:
DW_LOGGER.warning("Cannot write discarded data %s", exc)
self.discarded = None # Give up on logging further data

def sending(self):
"""Outgoing stage of downlink
Expand Down Expand Up @@ -107,6 +127,7 @@ def join(self):
for thread in [self.th_data, self.th_ground]:
if thread is not None:
thread.join()
self.discarded = None

def add_loopback_frame(self, frame):
"""Adds a frame to loopback to ground
Expand Down
13 changes: 11 additions & 2 deletions src/fprime_gds/common/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import os
import sys

INITIALIZED = False


def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=False):
"""
Expand All @@ -21,7 +23,14 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
:param mode: of file to write
:param mirror_to_stdout: mirror the log output to standard our
"""
handlers = [logging.StreamHandler(sys.stdout)] if directory is None or mirror_to_stdout else []
global INITIALIZED
if INITIALIZED:
return
handlers = (
[logging.StreamHandler(sys.stdout)]
if directory is None or mirror_to_stdout
else []
)
if directory is not None:
log_file = os.path.join(directory, os.path.basename(filename))
log_file = log_file if log_file.endswith(".log") else f"{log_file}.log"
Expand All @@ -33,4 +42,4 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
logging.info("Logging system initialized!")

INITIALIZED = True
26 changes: 18 additions & 8 deletions src/fprime_gds/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,16 @@ def handle_arguments(self, args, **kwargs):
if likely_deployment.exists():
args.deployment = likely_deployment
return args
child_directories = [child for child in detected_toolchain.iterdir() if child.is_dir()]
child_directories = [
child for child in detected_toolchain.iterdir() if child.is_dir()
]
if not child_directories:
msg = f"No deployments found in {detected_toolchain}. Specify deployment with: --deployment"
raise Exception(msg)
# Works for the old structure where the bin, lib, and dict directories live immediately under the platform
elif len(child_directories) == 3 and set([path.name for path in child_directories]) == {"bin", "lib", "dict"}:
elif len(child_directories) == 3 and set(
[path.name for path in child_directories]
) == {"bin", "lib", "dict"}:
args.deployment = detected_toolchain
return args
elif len(child_directories) > 1:
Expand Down Expand Up @@ -310,8 +314,7 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
"action": "store",
"type": str,
"help": "Adapter for communicating to flight deployment. [default: %(default)s]",
"choices": ["none"]
+ list(adapter_definition_dictionaries),
"choices": ["none"] + list(adapter_definition_dictionaries),
"default": "ip",
},
("--comm-checksum-type",): {
Expand All @@ -326,6 +329,15 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
],
"default": fprime_gds.common.communication.checksum.CHECKSUM_SELECTION,
},
("--output-unframed-data",): {
"dest": "output_unframed_data",
"action": "store",
"nargs": "?",
"help": "Log unframed data to supplied file relative to log directory. Use '-' for standard out.",
"default": None,
"const": "unframed.log",
"required": False,
},
}
return {**adapter_arguments, **com_arguments}

Expand Down Expand Up @@ -699,9 +711,7 @@ def handle_arguments(self, args, **kwargs):
args.app = Path(args.app) if args.app else Path(find_app(args.deployment))
if not args.app.is_file():
msg = f"F prime binary '{args.app}' does not exist or is not a file"
raise ValueError(
msg
)
raise ValueError(msg)
return args


Expand All @@ -728,7 +738,7 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
"action": "store",
"required": False,
"type": int,
"nargs":'+',
"nargs": "+",
"help": f"only show {self.command_name} matching the given type ID(s) 'ID'; can provide multiple IDs to show all given types",
"metavar": "ID",
},
Expand Down
79 changes: 53 additions & 26 deletions src/fprime_gds/executables/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import signal
import sys

from pathlib import Path

# Required adapters built on standard tools
try:
from fprime_gds.common.zmq_transport import ZmqGround
Expand Down Expand Up @@ -84,34 +86,59 @@ def main():
# Set the framing class used and pass it to the uplink and downlink component constructions giving each a separate
# instantiation
framer_class = FpFramerDeframer
LOGGER.info("Starting uplinker/downlinker connecting to FSW using %s with %s", adapter, framer_class.__name__)
downlinker = Downlinker(adapter, ground, framer_class())
uplinker = Uplinker(adapter, ground, framer_class(), downlinker)

# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
adapter.open()

# Finally start the processing of uplink and downlink
downlinker.start()
uplinker.start()
LOGGER.debug("Uplinker and downlinker running")

# Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, and wait for
# everything to terminate as expected.
def shutdown(*_):
"""Shutdown function for signals"""
uplinker.stop()
downlinker.stop()
LOGGER.info(
"Starting uplinker/downlinker connecting to FSW using %s with %s",
adapter,
framer_class.__name__,
)
discarded_file_handle = None
try:
if args.output_unframed_data == "-":
discarded_file_handle = sys.stdout.buffer
elif args.output_unframed_data is not None:
discarded_file_handle_path = (
Path(args.logs) / Path(args.output_unframed_data)
).resolve()
try:
discarded_file_handle = open(discarded_file_handle_path, "wb")
LOGGER.info("Logging unframed data to %s", discarded_file_handle_path)
except OSError:
LOGGER.warning(
"Failed to open %s. Unframed data will be discarded.",
discarded_file_handle_path,
)
downlinker = Downlinker(
adapter, ground, framer_class(), discarded=discarded_file_handle
)
uplinker = Uplinker(adapter, ground, framer_class(), downlinker)

# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
adapter.open()

# Finally start the processing of uplink and downlink
downlinker.start()
uplinker.start()
LOGGER.debug("Uplinker and downlinker running")

# Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, and wait for
# everything to terminate as expected.
def shutdown(*_):
"""Shutdown function for signals"""
uplinker.stop()
downlinker.stop()
uplinker.join()
downlinker.join()
ground.close()
adapter.close()

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
uplinker.join()
downlinker.join()
ground.close()
adapter.close()

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
uplinker.join()
downlinker.join()
finally:
if discarded_file_handle is not None and args.output_unframed_data != "-":
discarded_file_handle.close()
return 0


Expand Down
13 changes: 12 additions & 1 deletion src/fprime_gds/flask/static/js/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,18 @@ class FullListHistory extends ListHistory {
* @param new_items: new items being to be process
*/
send(new_items) {
this.store.splice(0, this.store.length, ...new_items);
// When the lists are not the same, update the stored list otherwise keep the list to prevent unnecessary bound
// data re-rendering.
if (this.store.length !== new_items.length) {
this.store.splice(0, this.store.length, ...new_items);
return;
}
for (let i = 0; i < Math.min(this.store.length, new_items.length); i++) {
if (this.store[i] !== new_items[i]) {
this.store.splice(0, this.store.length, ...new_items);
return;
}
}
}
}

Expand Down
Loading

0 comments on commit 54532c8

Please sign in to comment.