diff --git a/scripts/firmware.cfg b/scripts/firmware.cfg new file mode 100644 index 0000000..6f5cb42 --- /dev/null +++ b/scripts/firmware.cfg @@ -0,0 +1,4 @@ +KLIPPY_LOG = "~/printer_data/logs/klippy.log" +KATAPULT = "~/katapult" +KLIPPER = "~/klipper" +KLIPPY_ENV = "~/klippy_env" diff --git a/scripts/firmware.py b/scripts/firmware.py index 1082886..65e4b2d 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -25,15 +25,10 @@ Union, Tuple, Set, + ClassVar, ) -HOME_PATH = os.path.expanduser("~") -CONFIG_DIR: str = os.path.expanduser("~/printer_data/config") -KLIPPY_LOG: str = os.path.expanduser("~/printer_data/logs/klippy.log") -KLIPPER_DIR: str = os.path.expanduser("~/klipper") -KATAPULT_DIR: str = os.path.expanduser("~/katapult") - -FLASHER_VERSION: str = "0.0.2" +FLASHER_VERSION: str = "0.0.3" PAGE_WIDTH: int = 89 # Default global width @@ -105,12 +100,57 @@ class FirmwareFile(NamedTuple): class Utils: + CONFIG_FILE: ClassVar[str] = "firmware.cfg" + + # Default Directories + DEFAULT_DIRECTORIES: ClassVar[Dict[str, str]] = { + "KLIPPY_LOG": os.path.expanduser("~/printer_data/logs/klippy.log"), + "KATAPULT_DIR": os.path.expanduser("~/katapult"), + "KLIPPER": os.path.expanduser("~/klipper"), + "KLIPPY_ENV": os.path.expanduser("~/klippy_env"), + } + + @classmethod + def load_config(cls) -> Dict[str, str]: + """ + Load variables from the configuration file if it exists. + Fall back to default values if the file is not found. + + Returns: + Dict[str, str]: A dictionary containing the configuration variables. + """ + config_variables: Dict[str, str] = dict(cls.DEFAULT_DIRECTORIES) + logging.debug(f"Loading Config Variables: {config_variables}") + + # Check if the configuration file exists + if os.path.isfile(cls.CONFIG_FILE): + file_variables: Dict[str, str] = {} + with open(cls.CONFIG_FILE, "r") as file: + # Use exec to evaluate the file content + exec(file.read(), {}, file_variables) + + # Expand user (~) in paths + for key, value in file_variables.items(): + if value.startswith("~"): + file_variables[key] = os.path.expanduser(value) + + # Update the default config with values from the file + config_variables.update(file_variables) + + logging.debug(f"Updated Config Variables: {config_variables}") + return config_variables + @staticmethod def configure_logging(): # Get the root logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture all logs (DEBUG and above) + # Remove all existing handlers to avoid duplicate logs + + if logger.hasHandlers(): + logger.handlers.clear() + # Create a console handler (only active for INFO level messages) console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) # Set to DEBUG to allow filtering @@ -276,6 +316,19 @@ def is_key_pressed(timeout: int = 1) -> bool: return True return False + @staticmethod + def restart_klipper(): + try: + # Execute the restart command + _ = subprocess.run( + ["sudo", "service", "klipper", "restart"], + check=True, + text=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + Utils.error_msg(f"Failed to restart the service ({e.stderr})") + class Menu: title: str @@ -440,13 +493,20 @@ def __init__( self.kseries: bool = kseries self.all: bool = all self.device: Optional[str] = device - self.can = Can(self, debug=self.debug, ftype=self.ftype) - self.usb = Usb(self, debug=self.debug, ftype=self.ftype) + self.config: Dict[str, str] = Utils.load_config() + self.can = Can(self, self.config, debug=self.debug, ftype=self.ftype) + self.usb = Usb(self, self.config, debug=self.debug, ftype=self.ftype) self.dfu = Dfu( self, debug=self.debug, ftype=self.ftype ) # Pass Firmware instance to CAN self.validator: Validator = Validator(self) # Initialize the Validator + self.menu_handlers: Dict[str, Callable[[], None]] = { + FlashMethod.CAN: self.can.menu, + FlashMethod.USB: self.usb.menu, + FlashMethod.DFU: self.dfu.menu, + } + def set_device(self, device: str): logging.debug(f"Device Set: {device}") self.selected_device = device @@ -465,13 +525,8 @@ def handle_initialization(self): """ Handle device initialization based on the flash type and device UUID. """ - handlers: Dict[str, Callable[[], None]] = { - FlashMethod.CAN: self.can.menu, - FlashMethod.USB: self.usb.menu, - FlashMethod.DFU: self.dfu.menu, - } - if self.device and self.flash in handlers: + if self.device and self.flash in self.menu_handlers: # Validate the device if self.validator.validate_device(self.device, self.flash): self.set_device(self.device) @@ -481,7 +536,9 @@ def handle_initialization(self): self.firmware_menu(type=self.flash) # Call the appropriate menu directly from the handlers dictionary - handlers[self.flash]() + self.menu_handlers[self.flash]() + elif self.flash in self.menu_handlers: + self.menu_handlers[self.flash]() else: self.main_menu() @@ -574,7 +631,6 @@ def select_latest(self, firmware_files: List[FirmwareFile], type: FlashMethod): subdirectory, file = latest_firmware_files[0] firmware_path = os.path.join(subdirectory, file) # Construct the full path self.select_firmware(firmware_path, type) - self.main_menu() else: logging.info("No firmware files found in the latest subdirectory.") @@ -640,18 +696,74 @@ def set_custom_branch(self): logging.info("No custom branch provided.") self.branch_menu() - def restart_klipper(self): + def edit_config(self, option: str) -> None: + """ + Display and allow editing of the specified config in the configuration file. + """ + # Check if the config exists in the loaded config + if option not in self.config: + print(f"Config '{option}' is not a recognized configuration key.") + return + + current_value = self.config[option] + print(f"Current value for '{option}': {current_value}") + new_value = input("Enter new value (or press Enter to keep current): ").strip() + + if new_value: + # Update the configuration + self.set_config(option, new_value) + Utils.success_msg(f"Updated '{option}' to '{new_value}'.") + else: + Utils.success_msg(f"'{option}' unchanged.") + self.directory_menu() + + def set_config(self, option: str, value: str) -> None: + """ + Update or add a config value in the configuration file. + """ + # Read current file content or initialize with defaults + if os.path.isfile(Utils.CONFIG_FILE): + with open(Utils.CONFIG_FILE, "r") as file: + lines = file.readlines() + else: + lines = [] + + # Check if the config already exists in the file + for i, line in enumerate(lines): + if line.startswith(f"{option} ="): + # Update the existing line + lines[i] = f'{option} = "{value}"\n' + break + else: + # Add the new option line if not found + lines.append(f'{option} = "{value}"\n') + + # Write updated content back to the file + with open(Utils.CONFIG_FILE, "w") as file: + file.writelines(lines) + + # Update the runtime config + self.config[option] = value + + logging.debug(f"Set {option} to '{value}' in '{Utils.CONFIG_FILE}'.") + + def reset_config(self) -> None: + """ + Reset the configuration file to the default values. + """ try: - # Execute the restart command - _ = subprocess.run( - ["sudo", "service", "klipper", "restart"], - check=True, - text=True, - capture_output=True, - ) - Utils.success_msg("Service restarted successfully!") - except subprocess.CalledProcessError as e: - Utils.error_msg(f"Failed to restart the service ({e.stderr})") + with open(Utils.CONFIG_FILE, "w") as file: + for key, value in Utils.DEFAULT_DIRECTORIES.items(): + _ = file.write(f'{key} = "{value}"\n') + + self.config = dict( + Utils.DEFAULT_DIRECTORIES + ) # Reset runtime config as well + logging.debug(f"Reset configuration to defaults in '{Utils.CONFIG_FILE}'.") + Utils.success_msg("Configuration has been reset to default values.") + except Exception as e: + logging.error(f"Failed to reset configuration: {e}") + Utils.error_msg(f"Error resetting configuration: {e}") # Create main menu def main_menu(self) -> None: @@ -712,6 +824,11 @@ def add_advanced_options( Utils.colored_text("Flash via DFU", Color.MAGENTA), self.dfu.menu ) menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + Utils.colored_text("Set Custom Directories", Color.CYAN), + self.directory_menu, + ) + menu_items[len(menu_items) + 1] = Menu.Separator() menu_items[len(menu_items) + 1] = Menu.Item( Utils.colored_text("Switch Flash Mode", Color.CYAN), self.mode_menu ) @@ -802,6 +919,43 @@ def mode_menu(self): menu = Menu("Select a flashing mode", menu_items) menu.display() + def directory_menu(self): + Utils.header() + menu_items: Dict[int, Union[Menu.Item, Menu.Separator]] = {} + + menu_items[len(menu_items) + 1] = Menu.Item( + "Klippy Env", + lambda: self.edit_config("KLIPPY_ENV"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Klippy Logs", + lambda: self.edit_config("KLIPPY_LOG"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Klipper", + lambda: self.edit_config("KLIPPER"), + ) + menu_items[len(menu_items) + 1] = Menu.Item( + "Katapult", + lambda: self.edit_config("KATAPULT_DIR"), + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + "Reset to Defaults", lambda: self.reset_config() + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + menu_items[len(menu_items) + 1] = Menu.Item( + Utils.colored_text("Back to Main Menu", Color.CYAN), + self.main_menu, + ) + menu_items[len(menu_items) + 1] = Menu.Separator() + # Add the "Exit" option last + menu_items[0] = Menu.Item("Exit", lambda: exit()) + + # Create and display the menu + menu = Menu("Which Directory do you want to change?", menu_items) + menu.display() + def branch_menu(self): def display_branch_table(): # Table header @@ -901,7 +1055,9 @@ def display_firmware_menu( "Check Again", lambda: self.firmware_menu(type) ) menu_items[len(menu_items) + 1] = Menu.Separator() - menu_items[len(menu_items) + 1] = Menu.Item("Back", self.can.menu) + handler = self.menu_handlers.get(type) + if handler: + menu_items[len(menu_items) + 1] = Menu.Item("Back", lambda: handler()) menu_items[len(menu_items) + 1] = Menu.Item( Utils.colored_text("Back to main menu", Color.CYAN), self.main_menu ) @@ -916,14 +1072,11 @@ def display_firmware_menu( def select_firmware(self, firmware: str, type: FlashMethod): self.set_firmware(firmware) - menu_handlers: Dict[str, Callable[[], None]] = { - FlashMethod.CAN: self.can.menu, - FlashMethod.USB: self.usb.menu, - FlashMethod.DFU: self.dfu.menu, - } + if args.device and args.flash and self.selected_firmware: + self.confirm(type) # Retrieve the appropriate handler and call it if valid - handler = menu_handlers.get(type) + handler = self.menu_handlers.get(type) if handler: handler() # Call the appropriate menu method else: @@ -933,6 +1086,7 @@ def select_firmware(self, firmware: str, type: FlashMethod): def firmware_menu(self, type: FlashMethod): if not type: raise ValueError("type cannot be None or empty") + # Get the bitrate from CAN interface bitrate = self.can.get_bitrate() @@ -1062,11 +1216,16 @@ def flash_success(self, result: str): Utils.page("Flashed Successfully") if self.debug: print(result) - Utils.success_msg("Firmware flashed successfully to device!") + logging.info("Firmware flashed successfully to device!") # Clean the temporary directory if self.retrieve: self.retrieve.clean_temp_dir() - self.main_menu() # Return to the main menu or any other menu + _ = input( + "Press any key and you may be asked for your password in order to restart klipper\n" + + "Please make sure youre not printing when you do this." + ) + Utils.restart_klipper() + exit() # If flash failed def flash_fail(self, message: str): @@ -1077,20 +1236,12 @@ def flash_fail(self, message: str): self.retrieve.clean_temp_dir() Utils.error_msg(message) - # Show what to do next screen - def finished(self): - Utils.header() - _ = input( - "Press any key and you may be asked for your password in order to restart klipper" - + "Please make sure youre not printing when you do this." - ) - self.restart_klipper() - class Can: def __init__( self, firmware: Firmware, + config: Dict[str, str], debug: bool = False, ftype: bool = False, ): @@ -1101,6 +1252,7 @@ def __init__( self.ftype: bool = ftype self.selected_device: Optional[str] = None self.selected_firmware: Optional[str] = None + self.config: Dict[str, str] = config def get_bitrate(self, interface: str = "can0"): try: @@ -1235,7 +1387,7 @@ def query_devices(self): self.menu() return try: - cmd = os.path.expanduser("~/katapult/scripts/flashtool.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flashtool.py") command = ["python3", cmd, "-i", "can0", "-q"] result = subprocess.run(command, text=True, capture_output=True, check=True) @@ -1313,7 +1465,7 @@ def search_klippy(self) -> None: scanner_uuids: list[str] = [] # UUIDs with [scanner] above them regular_uuids: list[str] = [] # UUIDs without either tag - with open(KLIPPY_LOG, "r") as log_file: + with open(self.config["KLIPPY_LOG"], "r") as log_file: lines = log_file.readlines() # Parse the log to find UUIDs and their contexts @@ -1374,7 +1526,7 @@ def search_klippy(self) -> None: except FileNotFoundError: Utils.error_msg( - f"KLIPPY log file not found at {KLIPPY_LOG}.", + f"KLIPPY log file not found at {self.config['KLIPPY_LOG']}.", ) self.menu() except Exception as e: @@ -1392,7 +1544,7 @@ def flash_device(self, firmware_file: str, device: str): self.validator.check_selected_device() self.validator.check_selected_firmware() # Prepare the command to execute the flash script - cmd: str = os.path.expanduser("~/katapult/scripts/flash_can.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flash_can.py") command = [ "python3", cmd, @@ -1446,7 +1598,13 @@ def flash_device(self, firmware_file: str, device: str): class Usb: - def __init__(self, firmware: Firmware, debug: bool = False, ftype: bool = False): + def __init__( + self, + firmware: Firmware, + config: Dict[str, str], + debug: bool = False, + ftype: bool = False, + ): self.firmware: Firmware = firmware self.validator: Validator = Validator(firmware) self.katapult: KatapultInstaller = KatapultInstaller() @@ -1454,6 +1612,7 @@ def __init__(self, firmware: Firmware, debug: bool = False, ftype: bool = False) self.ftype: bool = ftype self.selected_device: Optional[str] = None self.selected_firmware: Optional[str] = None + self.config: Dict[str, str] = config def select_device(self, device: str): self.selected_device = device # Save the selected device globally @@ -1522,8 +1681,9 @@ def query_devices(self): def enter_katapult_bootloader(self, device: str): try: device_path = f"/dev/serial/by-id/{device}" + env: str = os.path.join(self.config["KLIPPY_ENV"], "bin", "python") bootloader_cmd = [ - os.path.expanduser("~/klippy-env/bin/python"), + env, "-c", f"import flash_usb as u; u.enter_bootloader('{device_path}')", ] @@ -1534,7 +1694,7 @@ def enter_katapult_bootloader(self, device: str): text=True, capture_output=True, # Captures both stdout and stderr check=True, - cwd=os.path.expanduser("~/klipper/scripts"), + cwd=os.path.join(self.config["KLIPPER"], "scripts"), ) # Log stdout @@ -1641,7 +1801,7 @@ def flash_device(self, firmware_file: str, device: str): return # Prepare the flash command - cmd: str = os.path.expanduser("~/katapult/scripts/flash_can.py") + cmd = os.path.join(self.config["KATAPULT_DIR"], "scripts", "flash_can.py") command = [ "python3", cmd, @@ -2022,10 +2182,12 @@ def main(self): class KatapultInstaller: + config: Dict[str, str] = Utils.load_config() + def create_directory(self) -> bool: - if not os.path.exists(KATAPULT_DIR): + if not os.path.exists(self.config["KATAPULT_DIR"]): try: - os.makedirs(KATAPULT_DIR) + os.makedirs(self.config["KATAPULT_DIR"]) if args.debug: logging.info("Katapult directory created successfully.") else: @@ -2036,7 +2198,7 @@ def create_directory(self) -> bool: return True def clone_repository(self) -> bool: - git_dir = os.path.join(KATAPULT_DIR, ".git") + git_dir = os.path.join(self.config["KATAPULT_DIR"], ".git") if not os.path.exists(git_dir): if args.debug: logging.info( @@ -2052,7 +2214,7 @@ def clone_repository(self) -> bool: "git", "clone", "https://github.com/arksine/katapult", - KATAPULT_DIR, + self.config["KATAPULT_DIR"], ], check=True, ) @@ -2069,7 +2231,14 @@ def clone_repository(self) -> bool: def verify_repository(self) -> bool: try: result = subprocess.run( - ["git", "-C", KATAPULT_DIR, "config", "--get", "remote.origin.url"], + [ + "git", + "-C", + self.config["KATAPULT_DIR"], + "config", + "--get", + "remote.origin.url", + ], text=True, capture_output=True, check=True, @@ -2085,15 +2254,23 @@ def verify_repository(self) -> bool: def check_and_update_repository(self) -> bool: try: - _ = subprocess.run(["git", "-C", KATAPULT_DIR, "fetch"], check=True) + _ = subprocess.run( + ["git", "-C", self.config["KATAPULT_DIR"], "fetch"], check=True + ) local_commit = subprocess.run( - ["git", "-C", KATAPULT_DIR, "rev-parse", "HEAD"], + ["git", "-C", self.config["KATAPULT_DIR"], "rev-parse", "HEAD"], text=True, capture_output=True, check=True, ).stdout.strip() remote_commit = subprocess.run( - ["git", "-C", KATAPULT_DIR, "rev-parse", "origin/master"], + [ + "git", + "-C", + self.config["KATAPULT_DIR"], + "rev-parse", + "origin/master", + ], text=True, capture_output=True, check=True, @@ -2104,7 +2281,10 @@ def check_and_update_repository(self) -> bool: logging.info("The repository is not up to date. Updating...") else: logging.debug("The repository is not up to date. Updating...") - _ = subprocess.run(["git", "-C", KATAPULT_DIR, "pull"], check=True) + _ = subprocess.run( + ["git", "-C", self.config["KATAPULT_DIR"], "pull"], + check=True, + ) if args.debug: logging.info("Repository updated successfully.") else: @@ -2241,7 +2421,6 @@ def install(self) -> None: ) try: args = parser.parse_args(namespace=FirmwareNamespace()) - Utils.configure_logging() logging.debug( "###################################################################################################" @@ -2272,17 +2451,7 @@ def install(self) -> None: ## TODO ## ## Adjust so users cannot be in certain modes together Utils.make_terminal_bigger() - if args.all or args.flash and not args.all: - if args.flash == FlashMethod.CAN: - fw.can.menu() - elif args.flash == FlashMethod.USB: - fw.usb.menu() - elif args.flash == FlashMethod.DFU: - fw.dfu.menu() - else: - fw.main_menu() - else: - fw.handle_initialization() + fw.handle_initialization() except KeyboardInterrupt: print("\nProcess interrupted by user. Exiting...") exit(0)