diff --git a/scripts/firmware.cfg b/scripts/firmware.cfg new file mode 100644 index 0000000..6f5cb42 --- /dev/null +++ b/scripts/firmware.cfg @@ -0,0 +1,4 @@ +KLIPPY_LOG = "~/printer_data/logs/klippy.log" +KATAPULT = "~/katapult" +KLIPPER = "~/klipper" +KLIPPY_ENV = "~/klippy_env" diff --git a/scripts/firmware.py b/scripts/firmware.py index 8b3778b..65e4b2d 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -9,6 +9,9 @@ import fnmatch import platform import time +import sys +import logging +from logging.handlers import RotatingFileHandler from enum import Enum from time import sleep @@ -22,15 +25,10 @@ Union, Tuple, Set, + ClassVar, ) -HOME_PATH = os.path.expanduser("~") -CONFIG_DIR: str = os.path.expanduser("~/printer_data/config") -KLIPPY_LOG: str = os.path.expanduser("~/printer_data/logs/klippy.log") -KLIPPER_DIR: str = os.path.expanduser("~/klipper") -KATAPULT_DIR: str = os.path.expanduser("~/katapult") - -FLASHER_VERSION: str = "0.0.2" +FLASHER_VERSION: str = "0.0.3" PAGE_WIDTH: int = 89 # Default global width @@ -102,6 +100,91 @@ class FirmwareFile(NamedTuple): class Utils: + CONFIG_FILE: ClassVar[str] = "firmware.cfg" + + # Default Directories + DEFAULT_DIRECTORIES: ClassVar[Dict[str, str]] = { + "KLIPPY_LOG": os.path.expanduser("~/printer_data/logs/klippy.log"), + "KATAPULT_DIR": os.path.expanduser("~/katapult"), + "KLIPPER": os.path.expanduser("~/klipper"), + "KLIPPY_ENV": os.path.expanduser("~/klippy_env"), + } + + @classmethod + def load_config(cls) -> Dict[str, str]: + """ + Load variables from the configuration file if it exists. + Fall back to default values if the file is not found. + + Returns: + Dict[str, str]: A dictionary containing the configuration variables. + """ + config_variables: Dict[str, str] = dict(cls.DEFAULT_DIRECTORIES) + logging.debug(f"Loading Config Variables: {config_variables}") + + # Check if the configuration file exists + if os.path.isfile(cls.CONFIG_FILE): + file_variables: Dict[str, str] = {} + with open(cls.CONFIG_FILE, "r") as file: + # Use exec to evaluate the file content + exec(file.read(), {}, file_variables) + + # Expand user (~) in paths + for key, value in file_variables.items(): + if value.startswith("~"): + file_variables[key] = os.path.expanduser(value) + + # Update the default config with values from the file + config_variables.update(file_variables) + + logging.debug(f"Updated Config Variables: {config_variables}") + return config_variables + + @staticmethod + def configure_logging(): + # Get the root logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Capture all logs (DEBUG and above) + + # Remove all existing handlers to avoid duplicate logs + + if logger.hasHandlers(): + logger.handlers.clear() + + # Create a console handler (only active for INFO level messages) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) # Set to DEBUG to allow filtering + + # Add a custom filter to only allow INFO messages + class InfoOnlyFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return record.levelno == logging.INFO + + console_handler.addFilter(InfoOnlyFilter()) # Apply the filter + + console_formatter = logging.Formatter( + "%(message)s" + ) # Simple format for console + console_handler.setFormatter(console_formatter) + + # Add console handler to the logger + logger.addHandler(console_handler) + + # Create a rotating file handler (always active) + file_handler = RotatingFileHandler( + "firmware.log", + maxBytes=5 * 1024 * 1024, + backupCount=3, # 5 MB per log + ) + file_handler.setLevel(logging.DEBUG) # Log everything to the file + file_formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s" + ) # Detailed format for file + file_handler.setFormatter(file_formatter) + + # Add file handler to the logger + logger.addHandler(file_handler) + @staticmethod def make_terminal_bigger(width: int = 110, height: int = 40): system = platform.system() @@ -163,12 +246,16 @@ def colored_text(text: str, color: Color) -> str: @staticmethod def error_msg(message: str) -> None: - print(Utils.colored_text("Error:", Color.RED), message) + colored_message = Utils.colored_text(f"Error: {message}", Color.RED) + logging.error(message) + print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) @staticmethod def success_msg(message: str) -> None: - print(Utils.colored_text("Success:", Color.GREEN), message) + colored_message = Utils.colored_text(f"Success: {message}", Color.GREEN) + logging.debug(message) # Log the colored message + print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) @staticmethod @@ -209,6 +296,39 @@ def show_mode(mode: str): mode = mode.center(PAGE_WIDTH) print(Utils.colored_text(mode, Color.RED)) + @staticmethod + def is_key_pressed(timeout: int = 1) -> bool: + if os.name == "nt": # Windows + import msvcrt + + start_time = time.time() + while time.time() - start_time < timeout: + if msvcrt.kbhit(): + _ = msvcrt.getch() # Consume the key press + return True + return False + else: # Unix-based systems + import select + + ready, _, _ = select.select([sys.stdin], [], [], timeout) + if ready: + _ = sys.stdin.read(1) # Consume the key press + return True + return False + + @staticmethod + def restart_klipper(): + try: + # Execute the restart command + _ = subprocess.run( + ["sudo", "service", "klipper", "restart"], + check=True, + text=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + Utils.error_msg(f"Failed to restart the service ({e.stderr})") + class Menu: title: str @@ -373,17 +493,26 @@ def __init__( self.kseries: bool = kseries self.all: bool = all self.device: Optional[str] = device - self.can = Can(self, debug=self.debug, ftype=self.ftype) - self.usb = Usb(self, debug=self.debug, ftype=self.ftype) + self.config: Dict[str, str] = Utils.load_config() + self.can = Can(self, self.config, debug=self.debug, ftype=self.ftype) + self.usb = Usb(self, self.config, debug=self.debug, ftype=self.ftype) self.dfu = Dfu( self, debug=self.debug, ftype=self.ftype ) # Pass Firmware instance to CAN self.validator: Validator = Validator(self) # Initialize the Validator + self.menu_handlers: Dict[str, Callable[[], None]] = { + FlashMethod.CAN: self.can.menu, + FlashMethod.USB: self.usb.menu, + FlashMethod.DFU: self.dfu.menu, + } + def set_device(self, device: str): + logging.debug(f"Device Set: {device}") self.selected_device = device def set_firmware(self, firmware: str): + logging.debug(f"Firmware Set: {firmware}") self.selected_firmware = firmware def get_device(self) -> Optional[str]: @@ -396,13 +525,8 @@ def handle_initialization(self): """ Handle device initialization based on the flash type and device UUID. """ - handlers: Dict[str, Callable[[], None]] = { - FlashMethod.CAN: self.can.menu, - FlashMethod.USB: self.usb.menu, - FlashMethod.DFU: self.dfu.menu, - } - if self.device and self.flash in handlers: + if self.device and self.flash in self.menu_handlers: # Validate the device if self.validator.validate_device(self.device, self.flash): self.set_device(self.device) @@ -412,7 +536,9 @@ def handle_initialization(self): self.firmware_menu(type=self.flash) # Call the appropriate menu directly from the handlers dictionary - handlers[self.flash]() + self.menu_handlers[self.flash]() + elif self.flash in self.menu_handlers: + self.menu_handlers[self.flash]() else: self.main_menu() @@ -427,7 +553,7 @@ def find_firmware_files( high_temp: bool = False, ) -> List[FirmwareFile]: if not os.path.isdir(base_dir): - print(f"Base directory does not exist: {base_dir}") + logging.info(f"Base directory does not exist: {base_dir}") return [] firmware_files: List[FirmwareFile] = [] @@ -469,6 +595,8 @@ def find_firmware_files( firmware_files.append( FirmwareFile(subdirectory=subdirectory, filename=file) ) + logging_msg = f"Firmware Found: {subdirectory}/{file}" + logging.debug(logging_msg) return sorted( firmware_files, key=lambda f: f.subdirectory @@ -476,13 +604,13 @@ def find_firmware_files( def select_latest(self, firmware_files: List[FirmwareFile], type: FlashMethod): if not firmware_files: - print("No firmware files found.") + logging.info("No firmware files found.") return # Extract unique subdirectory names subdirectories: Set[str] = {file[0] for file in firmware_files} if not subdirectories: - print("No valid subdirectories found.") + logging.info("No valid subdirectories found.") return latest_subdirectory: str = max( @@ -503,9 +631,8 @@ def select_latest(self, firmware_files: List[FirmwareFile], type: FlashMethod): subdirectory, file = latest_firmware_files[0] firmware_path = os.path.join(subdirectory, file) # Construct the full path self.select_firmware(firmware_path, type) - self.main_menu() else: - print("No firmware files found in the latest subdirectory.") + logging.info("No firmware files found in the latest subdirectory.") def set_advanced(self): global is_advanced @@ -554,6 +681,7 @@ def set_mode(self, mode: str): def set_branch(self, branch: str): if branch: + logging.debug(f"Branch Changed to : {branch}") self.branch = args.branch = branch else: Utils.error_msg("You didnt specify a branch to use.") @@ -565,21 +693,77 @@ def set_custom_branch(self): if custom_branch: self.set_branch(custom_branch) else: - print("No custom branch provided.") + logging.info("No custom branch provided.") self.branch_menu() - def restart_klipper(self): + def edit_config(self, option: str) -> None: + """ + Display and allow editing of the specified config in the configuration file. + """ + # Check if the config exists in the loaded config + if option not in self.config: + print(f"Config '{option}' is not a recognized configuration key.") + return + + current_value = self.config[option] + print(f"Current value for '{option}': {current_value}") + new_value = input("Enter new value (or press Enter to keep current): ").strip() + + if new_value: + # Update the configuration + self.set_config(option, new_value) + Utils.success_msg(f"Updated '{option}' to '{new_value}'.") + else: + Utils.success_msg(f"'{option}' unchanged.") + self.directory_menu() + + def set_config(self, option: str, value: str) -> None: + """ + Update or add a config value in the configuration file. + """ + # Read current file content or initialize with defaults + if os.path.isfile(Utils.CONFIG_FILE): + with open(Utils.CONFIG_FILE, "r") as file: + lines = file.readlines() + else: + lines = [] + + # Check if the config already exists in the file + for i, line in enumerate(lines): + if line.startswith(f"{option} ="): + # Update the existing line + lines[i] = f'{option} = "{value}"\n' + break + else: + # Add the new option line if not found + lines.append(f'{option} = "{value}"\n') + + # Write updated content back to the file + with open(Utils.CONFIG_FILE, "w") as file: + file.writelines(lines) + + # Update the runtime config + self.config[option] = value + + logging.debug(f"Set {option} to '{value}' in '{Utils.CONFIG_FILE}'.") + + def reset_config(self) -> None: + """ + Reset the configuration file to the default values. + """ try: - # Execute the restart command - _ = subprocess.run( - ["sudo", "service", "klipper", "restart"], - check=True, - text=True, - capture_output=True, - ) - Utils.success_msg("Service restarted successfully!") - except subprocess.CalledProcessError as e: - Utils.error_msg(f"Failed to restart the service ({e.stderr})") + with open(Utils.CONFIG_FILE, "w") as file: + for key, value in Utils.DEFAULT_DIRECTORIES.items(): + _ = file.write(f'{key} = "{value}"\n') + + self.config = dict( + Utils.DEFAULT_DIRECTORIES + ) # Reset runtime config as well + logging.debug(f"Reset configuration to defaults in '{Utils.CONFIG_FILE}'.") + Utils.success_msg("Configuration has been reset to default values.") + except Exception as e: + logging.error(f"Failed to reset configuration: {e}") + Utils.error_msg(f"Error resetting configuration: {e}") # Create main menu def main_menu(self) -> None: @@ -640,6 +824,11 @@ def add_advanced_options( Utils.colored_text("Flash via DFU", Color.MAGENTA), self.dfu.menu ) menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + Utils.colored_text("Set Custom Directories", Color.CYAN), + self.directory_menu, + ) + menu_items[len(menu_items) + 1] = Menu.Separator() menu_items[len(menu_items) + 1] = Menu.Item( Utils.colored_text("Switch Flash Mode", Color.CYAN), self.mode_menu ) @@ -730,6 +919,43 @@ def mode_menu(self): menu = Menu("Select a flashing mode", menu_items) menu.display() + def directory_menu(self): + Utils.header() + menu_items: Dict[int, Union[Menu.Item, Menu.Separator]] = {} + + menu_items[len(menu_items) + 1] = Menu.Item( + "Klippy Env", + lambda: self.edit_config("KLIPPY_ENV"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Klippy Logs", + lambda: self.edit_config("KLIPPY_LOG"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Klipper", + lambda: self.edit_config("KLIPPER"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Katapult", + lambda: self.edit_config("KATAPULT_DIR"), + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + "Reset to Defaults", lambda: self.reset_config() + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + Utils.colored_text("Back to Main Menu", Color.CYAN), + self.main_menu, + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + # Add the "Exit" option last + menu_items[0] = Menu.Item("Exit", lambda: exit()) + + # Create and display the menu + menu = Menu("Which Directory do you want to change?", menu_items) + menu.display() + def branch_menu(self): def display_branch_table(): # Table header @@ -829,7 +1055,9 @@ def display_firmware_menu( "Check Again", lambda: self.firmware_menu(type) ) menu_items[len(menu_items) + 1] = Menu.Separator() - menu_items[len(menu_items) + 1] = Menu.Item("Back", self.can.menu) + handler = self.menu_handlers.get(type) + if handler: + menu_items[len(menu_items) + 1] = Menu.Item("Back", lambda: handler()) menu_items[len(menu_items) + 1] = Menu.Item( Utils.colored_text("Back to main menu", Color.CYAN), self.main_menu ) @@ -840,18 +1068,15 @@ def display_firmware_menu( menu = Menu("Select Firmware", menu_items) menu.display() else: - print("No firmware files found.") + logging.info("No firmware files found.") def select_firmware(self, firmware: str, type: FlashMethod): self.set_firmware(firmware) - menu_handlers: Dict[str, Callable[[], None]] = { - FlashMethod.CAN: self.can.menu, - FlashMethod.USB: self.usb.menu, - FlashMethod.DFU: self.dfu.menu, - } + if args.device and args.flash and self.selected_firmware: + self.confirm(type) # Retrieve the appropriate handler and call it if valid - handler = menu_handlers.get(type) + handler = self.menu_handlers.get(type) if handler: handler() # Call the appropriate menu method else: @@ -861,6 +1086,7 @@ def select_firmware(self, firmware: str, type: FlashMethod): def firmware_menu(self, type: FlashMethod): if not type: raise ValueError("type cannot be None or empty") + # Get the bitrate from CAN interface bitrate = self.can.get_bitrate() @@ -927,6 +1153,8 @@ def confirm(self, type: FlashMethod): self.validator.check_selected_device() # Display selected firmware and device + logging.debug(f"Device to Flash: {self.selected_device}") + logging.debug(f"Firmware to Flash: {self.selected_firmware}") print( Utils.colored_text("Device to Flash:", Color.MAGENTA), self.selected_device @@ -988,11 +1216,16 @@ def flash_success(self, result: str): Utils.page("Flashed Successfully") if self.debug: print(result) - Utils.success_msg("Firmware flashed successfully to device!") + logging.info("Firmware flashed successfully to device!") # Clean the temporary directory if self.retrieve: self.retrieve.clean_temp_dir() - self.main_menu() # Return to the main menu or any other menu + _ = input( + "Press any key and you may be asked for your password in order to restart klipper\n" + + "Please make sure youre not printing when you do this." + ) + Utils.restart_klipper() + exit() # If flash failed def flash_fail(self, message: str): @@ -1003,20 +1236,12 @@ def flash_fail(self, message: str): self.retrieve.clean_temp_dir() Utils.error_msg(message) - # Show what to do next screen - def finished(self): - Utils.header() - _ = input( - "Press any key and you may be asked for your password in order to restart klipper" - + "Please make sure youre not printing when you do this." - ) - self.restart_klipper() - class Can: def __init__( self, firmware: Firmware, + config: Dict[str, str], debug: bool = False, ftype: bool = False, ): @@ -1027,6 +1252,7 @@ def __init__( self.ftype: bool = ftype self.selected_device: Optional[str] = None self.selected_firmware: Optional[str] = None + self.config: Dict[str, str] = config def get_bitrate(self, interface: str = "can0"): try: @@ -1036,6 +1262,7 @@ def get_bitrate(self, interface: str = "can0"): ).read() # Use subprocess for better control in production bitrate_match = re.search(r"bitrate\s(\d+)", result) if bitrate_match: + logging.debug(f"Bitrate: {bitrate_match.group(1)}") return bitrate_match.group(1) else: return None @@ -1160,7 +1387,7 @@ def query_devices(self): self.menu() return try: - cmd = os.path.expanduser("~/katapult/scripts/flashtool.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flashtool.py") command = ["python3", cmd, "-i", "can0", "-q"] result = subprocess.run(command, text=True, capture_output=True, check=True) @@ -1181,21 +1408,21 @@ def query_devices(self): .replace("Detected UUID: ", "") .strip() ) - print(uuid) + logging.info(uuid) detected_uuids.append(uuid) print("=" * 40) else: Utils.error_msg("No CAN devices found.") else: Utils.error_msg("Unexpected output format.") - self.menu() + return except subprocess.CalledProcessError as e: Utils.error_msg(f"Error querying CAN devices: {e}") - self.menu() + return except Exception as e: Utils.error_msg(f"Unexpected error: {e}") - self.menu() + return finally: # Define menu items, starting with UUID options menu_items: Dict[int, Union[Menu.Item, Menu.Separator]] = {} @@ -1238,7 +1465,7 @@ def search_klippy(self) -> None: scanner_uuids: list[str] = [] # UUIDs with [scanner] above them regular_uuids: list[str] = [] # UUIDs without either tag - with open(KLIPPY_LOG, "r") as log_file: + with open(self.config["KLIPPY_LOG"], "r") as log_file: lines = log_file.readlines() # Parse the log to find UUIDs and their contexts @@ -1299,7 +1526,7 @@ def search_klippy(self) -> None: except FileNotFoundError: Utils.error_msg( - f"KLIPPY log file not found at {KLIPPY_LOG}.", + f"KLIPPY log file not found at {self.config['KLIPPY_LOG']}.", ) self.menu() except Exception as e: @@ -1317,7 +1544,7 @@ def flash_device(self, firmware_file: str, device: str): self.validator.check_selected_device() self.validator.check_selected_firmware() # Prepare the command to execute the flash script - cmd: str = os.path.expanduser("~/katapult/scripts/flash_can.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flash_can.py") command = [ "python3", cmd, @@ -1339,7 +1566,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1369,7 +1598,13 @@ def flash_device(self, firmware_file: str, device: str): class Usb: - def __init__(self, firmware: Firmware, debug: bool = False, ftype: bool = False): + def __init__( + self, + firmware: Firmware, + config: Dict[str, str], + debug: bool = False, + ftype: bool = False, + ): self.firmware: Firmware = firmware self.validator: Validator = Validator(firmware) self.katapult: KatapultInstaller = KatapultInstaller() @@ -1377,6 +1612,7 @@ def __init__(self, firmware: Firmware, debug: bool = False, ftype: bool = False) self.ftype: bool = ftype self.selected_device: Optional[str] = None self.selected_firmware: Optional[str] = None + self.config: Dict[str, str] = config def select_device(self, device: str): self.selected_device = device # Save the selected device globally @@ -1389,7 +1625,6 @@ def query_devices(self): if not self.katapult.install(): Utils.error_msg("Error with Katapult") - self.menu() return detected_devices: List[str] = [] try: @@ -1397,7 +1632,7 @@ def query_devices(self): base_path = "/dev/serial/by-id/" if not os.path.exists(base_path): Utils.error_msg(f"Path '{base_path}' does not exist.") - self.menu() + return for device in os.listdir(base_path): if "Cartographer" in device or "katapult" in device: @@ -1407,7 +1642,6 @@ def query_devices(self): Utils.error_msg( "No devices containing 'Cartographer' or 'katapult' found." ) - self.menu() return # Display the detected devices @@ -1419,7 +1653,7 @@ def query_devices(self): except Exception as e: Utils.error_msg(f"Unexpected error while querying devices: {e}") - self.menu() + return # Define menu items, starting with detected devices menu_items: Dict[int, Union[Menu.Item, Menu.Separator]] = {} @@ -1444,6 +1678,55 @@ def query_devices(self): menu = Menu("Options", menu_items) menu.display() + def enter_katapult_bootloader(self, device: str): + try: + device_path = f"/dev/serial/by-id/{device}" + env: str = os.path.join(self.config["KLIPPY_ENV"], "bin", "python") + bootloader_cmd = [ + env, + "-c", + f"import flash_usb as u; u.enter_bootloader('{device_path}')", + ] + + # Run the command and capture its output + result = subprocess.run( + bootloader_cmd, + text=True, + capture_output=True, # Captures both stdout and stderr + check=True, + cwd=os.path.join(self.config["KLIPPER"], "scripts"), + ) + + # Log stdout + if result.stdout: + for line in result.stdout.splitlines(): + logging.debug(line) # Log each line to DEBUG + + # Log stderr + if result.stderr: + for line in result.stderr.splitlines(): + logging.debug(line) # Log each line to ERROR + + logging.info( + f"Bootloader command completed successfully for device {device}." + ) + + except subprocess.CalledProcessError as e: + logging.error( + f"Bootloader command failed for device {device}. Return code: {e.returncode}" + ) + if e.stdout: + for line in e.stdout.splitlines(): + logging.debug(line) # Log stdout from the exception + if e.stderr: + for line in e.stderr.splitlines(): + logging.error(line) # Log stderr from the exception + + except Exception as e: + logging.exception( + f"Unexpected error occurred while entering bootloader for device {device}: {e}" + ) + def menu(self) -> None: Utils.header() self.firmware.display_device() @@ -1498,21 +1781,7 @@ def flash_device(self, firmware_file: str, device: str): Utils.error_msg("Your device is not a valid Cartographer device.") self.menu() - # Prepend device path for Cartographer - device = f"/dev/serial/by-id/{device}" - - # Enter bootloader for the device - bootloader_cmd = [ - os.path.expanduser("~/klippy-env/bin/python"), - "-c", - f"import flash_usb as u; u.enter_bootloader('{device}')", - ] - _ = subprocess.run( - bootloader_cmd, - text=True, - check=True, - cwd=os.path.expanduser("~/klipper/scripts"), - ) + self.enter_katapult_bootloader(device) sleep(5) # Perform ls to find Katapult device @@ -1532,7 +1801,7 @@ def flash_device(self, firmware_file: str, device: str): return # Prepare the flash command - cmd: str = os.path.expanduser("~/katapult/scripts/flash_can.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flash_can.py") command = [ "python3", cmd, @@ -1552,7 +1821,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1595,48 +1866,43 @@ def check_dfu_util(self) -> bool: if shutil.which("dfu-util"): return True else: - print("dfu-util is not installed. Please install it and try again.") + logging.info("dfu-util is not installed. Please install it and try again.") return False def dfu_loop(self) -> List[str]: - start_time = time.time() - timeout = 30 # Timeout in seconds - detected_devices: List[str] = [] + print("Press any key to stop...\n") try: - while time.time() - start_time < timeout: + while True: # Run the `lsusb` command result = subprocess.run( ["lsusb"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) lines = result.stdout.splitlines() - # Parse all lines containing "DFU Mode" + # Check for DFU Mode in the output for line in lines: if "DFU Mode" in line: - # Extract the device ID (the 6th field in `lsusb` output) - device_id: str = line.split()[ - 5 - ] # Assuming field 6 contains the ID - detected_devices.append(device_id) # Add to the list - print(f"Detected DFU device: {device_id}") - if detected_devices: - return detected_devices # Exit both loops immediately - - print("No DFU devices found. Retrying in 1 second...") - time.sleep(1) # Wait 1 second before retrying - - print("No DFU devices found within the timeout period.") + device_id = line.split()[5] # Extract device ID (6th field) + detected_devices.append(device_id) + logging.info(f"DFU device found: {device_id}") + _ = input("Press any key to return to the main menu.") + return detected_devices # Exit the loop and return devices + + logging.info( + "DFU device not found, checking again... Press any key to return to the main menu." + ) + + # Check for key press with a timeout of 2 seconds + if Utils.is_key_pressed(timeout=2): + return detected_devices + except KeyboardInterrupt: - print("\nQuery canceled by user.") - return [] + return detected_devices except Exception as e: - print(f"Error while querying devices: {e}") - return [] - - # Return detected devices to avoid further processing if none found - return detected_devices + logging.error(f"Error: {e}") + return detected_devices def query_devices(self): Utils.header() @@ -1772,7 +2038,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1826,27 +2094,27 @@ def __init__(self, firmware: Firmware, branch: str = "master", debug: bool = Fal def temp_dir_exists(self) -> Optional[str]: if self.debug: - print(f"Checking temporary directory: {self.temp_dir}") + logging.info(f"Checking temporary directory: {self.temp_dir}") if os.path.exists(self.temp_dir): if self.debug: - print(f"Directory exists: {self.temp_dir}") + logging.info(f"Directory exists: {self.temp_dir}") subdirs = [ os.path.join(self.temp_dir, d) for d in os.listdir(self.temp_dir) if os.path.isdir(os.path.join(self.temp_dir, d)) ] if self.debug: - print(f"Subdirectories found: {subdirs}") + logging.info(f"Subdirectories found: {subdirs}") if subdirs: return subdirs[0] if self.debug: - print("No subdirectories found.") + logging.info("No subdirectories found.") return None def clean_temp_dir(self): if os.path.exists(self.temp_dir): if self.debug: - print(f"Cleaning temporary directory: {self.temp_dir}") + logging.info(f"Cleaning temporary directory: {self.temp_dir}") shutil.rmtree(self.temp_dir) os.makedirs(self.temp_dir, exist_ok=True) @@ -1855,7 +2123,7 @@ def download_and_extract(self): # Define the path for the downloaded tarball tarball_path = os.path.join(self.temp_dir, "firmware.tar.gz") - print("Downloading tarball...") + logging.info("Downloading tarball...") # Use curl to save the tarball to a file with open(os.devnull, "w") as devnull: curl_command = [ @@ -1872,7 +2140,7 @@ def download_and_extract(self): check=True, ) - print("Extracting tarball...") + logging.info("Extracting tarball...") # Extract the tarball into the temporary directory with open(os.devnull, "w") as devnull: tar_command = ["tar", "-xz", "-C", self.temp_dir, "-f", tarball_path] @@ -1914,22 +2182,30 @@ def main(self): class KatapultInstaller: + config: Dict[str, str] = Utils.load_config() + def create_directory(self) -> bool: - if not os.path.exists(KATAPULT_DIR): + if not os.path.exists(self.config["KATAPULT_DIR"]): try: - os.makedirs(KATAPULT_DIR) + os.makedirs(self.config["KATAPULT_DIR"]) if args.debug: - print("Katapult directory created successfully.") + logging.info("Katapult directory created successfully.") + else: + logging.debug("Katapult directory created successfully.") except OSError as e: Utils.error_msg(f"Failed to create directory: {e}") return False return True def clone_repository(self) -> bool: - git_dir = os.path.join(KATAPULT_DIR, ".git") + git_dir = os.path.join(self.config["KATAPULT_DIR"], ".git") if not os.path.exists(git_dir): if args.debug: - print( + logging.info( + "Directory exists but is not a Git repository. Cloning the repository..." + ) + else: + logging.debug( "Directory exists but is not a Git repository. Cloning the repository..." ) try: @@ -1938,12 +2214,14 @@ def clone_repository(self) -> bool: "git", "clone", "https://github.com/arksine/katapult", - KATAPULT_DIR, + self.config["KATAPULT_DIR"], ], check=True, ) if args.debug: - print("Repository cloned successfully.") + logging.info("Repository cloned successfully.") + else: + logging.debug("Repository cloned successfully.") return True except subprocess.CalledProcessError as e: Utils.error_msg(f"Failed to clone repository: {e}") @@ -1953,7 +2231,14 @@ def clone_repository(self) -> bool: def verify_repository(self) -> bool: try: result = subprocess.run( - ["git", "-C", KATAPULT_DIR, "config", "--get", "remote.origin.url"], + [ + "git", + "-C", + self.config["KATAPULT_DIR"], + "config", + "--get", + "remote.origin.url", + ], text=True, capture_output=True, check=True, @@ -1969,15 +2254,23 @@ def verify_repository(self) -> bool: def check_and_update_repository(self) -> bool: try: - _ = subprocess.run(["git", "-C", KATAPULT_DIR, "fetch"], check=True) + _ = subprocess.run( + ["git", "-C", self.config["KATAPULT_DIR"], "fetch"], check=True + ) local_commit = subprocess.run( - ["git", "-C", KATAPULT_DIR, "rev-parse", "HEAD"], + ["git", "-C", self.config["KATAPULT_DIR"], "rev-parse", "HEAD"], text=True, capture_output=True, check=True, ).stdout.strip() remote_commit = subprocess.run( - ["git", "-C", KATAPULT_DIR, "rev-parse", "origin/master"], + [ + "git", + "-C", + self.config["KATAPULT_DIR"], + "rev-parse", + "origin/master", + ], text=True, capture_output=True, check=True, @@ -1985,13 +2278,22 @@ def check_and_update_repository(self) -> bool: if local_commit != remote_commit: if args.debug: - print("The repository is not up to date. Updating...") - _ = subprocess.run(["git", "-C", KATAPULT_DIR, "pull"], check=True) + logging.info("The repository is not up to date. Updating...") + else: + logging.debug("The repository is not up to date. Updating...") + _ = subprocess.run( + ["git", "-C", self.config["KATAPULT_DIR"], "pull"], + check=True, + ) if args.debug: - print("Repository updated successfully.") + logging.info("Repository updated successfully.") + else: + logging.debug("Repository updated successfully.") else: if args.debug: - print("The repository is up to date.") + logging.info("The repository is up to date.") + else: + logging.debug("The repository is up to date.") except subprocess.CalledProcessError as e: Utils.error_msg(f"Git update failed: {e}") return False @@ -2011,7 +2313,9 @@ def install(self) -> bool: return False if args.debug: - print("Katapult check passed.") + logging.info("Katapult check passed.") + else: + logging.debug("Katapult check passed.") return True @@ -2117,6 +2421,12 @@ def install(self) -> None: ) try: args = parser.parse_args(namespace=FirmwareNamespace()) + Utils.configure_logging() + logging.debug( + "###################################################################################################" + ) + logging.info("Starting firmware flasher...") + logging.debug(f"Arguments: {vars(args)}") # Post-processing arguments # Ensure `args.flash` is a FlashMethod or None if isinstance(args.flash, str): # In case of any external assignment @@ -2141,17 +2451,7 @@ def install(self) -> None: ## TODO ## ## Adjust so users cannot be in certain modes together Utils.make_terminal_bigger() - if args.all or args.flash and not args.all: - if args.flash == FlashMethod.CAN: - fw.can.menu() - elif args.flash == FlashMethod.USB: - fw.usb.menu() - elif args.flash == FlashMethod.DFU: - fw.dfu.menu() - else: - fw.main_menu() - else: - fw.handle_initialization() + fw.handle_initialization() except KeyboardInterrupt: print("\nProcess interrupted by user. Exiting...") exit(0)