From 3f09edd5b893ea2dabcbb13ec1054c54fd34941a Mon Sep 17 00:00:00 2001 From: Chris Krause Date: Wed, 11 Dec 2024 03:30:22 +1100 Subject: [PATCH 1/5] adds logging --- scripts/firmware.py | 211 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 165 insertions(+), 46 deletions(-) diff --git a/scripts/firmware.py b/scripts/firmware.py index 3d8e018..020cfd1 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -9,6 +9,8 @@ import fnmatch import platform import time +import logging +from logging.handlers import RotatingFileHandler from enum import Enum from time import sleep @@ -102,6 +104,44 @@ class FirmwareFile(NamedTuple): class Utils: + @staticmethod + def configure_logging(): + # Get the root logger + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) # Capture all logs (DEBUG and above) + + # Remove all existing handlers + if logger.hasHandlers(): + logger.handlers.clear() + + # Create a console handler (always active) + console_handler = logging.StreamHandler() + console_handler.setLevel( + logging.INFO + ) # Show only info and above in the console + console_formatter = logging.Formatter( + "%(message)s" + ) # Simple format for console + console_handler.setFormatter(console_formatter) + + # Add console handler to the logger + logger.addHandler(console_handler) + + # Create a rotating file handler (always active) + file_handler = RotatingFileHandler( + "firmware.log", + maxBytes=5 * 1024 * 1024, + backupCount=3, # 5 MB per log + ) + file_handler.setLevel(logging.DEBUG) # Log everything to the file + file_formatter = logging.Formatter( + "%(asctime)s - %(levelname)s - %(message)s" + ) # Detailed format for file + file_handler.setFormatter(file_formatter) + + # Add file handler to the logger + logger.addHandler(file_handler) + @staticmethod def make_terminal_bigger(width: int = 110, height: int = 40): system = platform.system() @@ -163,14 +203,22 @@ def colored_text(text: str, color: Color) -> str: @staticmethod def error_msg(message: str) -> None: - print(Utils.colored_text("Error:", Color.RED), message) + colored_message = Utils.colored_text(f"Error: {message}", Color.RED) + logging.error(colored_message) + print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) @staticmethod def success_msg(message: str) -> None: - print(Utils.colored_text("Success:", Color.GREEN), message) + colored_message = Utils.colored_text(f"Success: {message}", Color.GREEN) + logging.debug(message) # Log the colored message + print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) + @staticmethod + def log_debug(message: str) -> None: + logging.debug(message) + @staticmethod def page(title: str, width: int = PAGE_WIDTH) -> None: if len(title) > width: @@ -381,9 +429,11 @@ def __init__( self.validator: Validator = Validator(self) # Initialize the Validator def set_device(self, device: str): + logging.debug(f"Device Set: {device}") self.selected_device = device def set_firmware(self, firmware: str): + logging.debug(f"Firmware Set: {firmware}") self.selected_firmware = firmware def get_device(self) -> Optional[str]: @@ -427,7 +477,7 @@ def find_firmware_files( high_temp: bool = False, ) -> List[FirmwareFile]: if not os.path.isdir(base_dir): - print(f"Base directory does not exist: {base_dir}") + logging.info(f"Base directory does not exist: {base_dir}") return [] firmware_files: List[FirmwareFile] = [] @@ -469,6 +519,8 @@ def find_firmware_files( firmware_files.append( FirmwareFile(subdirectory=subdirectory, filename=file) ) + logging_msg = f"Firmware Found: {subdirectory}/{file}" + logging.debug(logging_msg) return sorted( firmware_files, key=lambda f: f.subdirectory @@ -476,13 +528,13 @@ def find_firmware_files( def select_latest(self, firmware_files: List[FirmwareFile], type: FlashMethod): if not firmware_files: - print("No firmware files found.") + logging.info("No firmware files found.") return # Extract unique subdirectory names subdirectories: Set[str] = {file[0] for file in firmware_files} if not subdirectories: - print("No valid subdirectories found.") + logging.info("No valid subdirectories found.") return latest_subdirectory: str = max( @@ -505,7 +557,7 @@ def select_latest(self, firmware_files: List[FirmwareFile], type: FlashMethod): self.select_firmware(firmware_path, type) self.main_menu() else: - print("No firmware files found in the latest subdirectory.") + logging.info("No firmware files found in the latest subdirectory.") def set_advanced(self): global is_advanced @@ -554,6 +606,7 @@ def set_mode(self, mode: str): def set_branch(self, branch: str): if branch: + logging.debug(f"Branch Changed to : {branch}") self.branch = args.branch = branch else: Utils.error_msg("You didnt specify a branch to use.") @@ -565,7 +618,7 @@ def set_custom_branch(self): if custom_branch: self.set_branch(custom_branch) else: - print("No custom branch provided.") + logging.info("No custom branch provided.") self.branch_menu() def restart_klipper(self): @@ -840,7 +893,7 @@ def display_firmware_menu( menu = Menu("Select Firmware", menu_items) menu.display() else: - print("No firmware files found.") + logging.info("No firmware files found.") def select_firmware(self, firmware: str, type: FlashMethod): self.set_firmware(firmware) @@ -927,6 +980,8 @@ def confirm(self, type: FlashMethod): self.validator.check_selected_device() # Display selected firmware and device + logging.debug(f"Device to Flash: {self.selected_device}") + logging.debug(f"Firmware to Flash: {self.selected_firmware}") print( Utils.colored_text("Device to Flash:", Color.MAGENTA), self.selected_device @@ -1036,6 +1091,7 @@ def get_bitrate(self, interface: str = "can0"): ).read() # Use subprocess for better control in production bitrate_match = re.search(r"bitrate\s(\d+)", result) if bitrate_match: + logging.debug(f"Bitrate: {bitrate_match.group(1)}") return bitrate_match.group(1) else: return None @@ -1181,7 +1237,7 @@ def query_devices(self): .replace("Detected UUID: ", "") .strip() ) - print(uuid) + logging.info(uuid) detected_uuids.append(uuid) print("=" * 40) else: @@ -1339,7 +1395,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1442,6 +1500,54 @@ def query_devices(self): menu = Menu("Options", menu_items) menu.display() + def enter_katapult_bootloader(self, device: str): + try: + device_path = f"/dev/serial/by-id/{device}" + bootloader_cmd = [ + os.path.expanduser("~/klippy-env/bin/python"), + "-c", + f"import flash_usb as u; u.enter_bootloader('{device_path}')", + ] + + # Run the command and capture its output + result = subprocess.run( + bootloader_cmd, + text=True, + capture_output=True, # Captures both stdout and stderr + check=True, + cwd=os.path.expanduser("~/klipper/scripts"), + ) + + # Log stdout + if result.stdout: + for line in result.stdout.splitlines(): + logging.debug(line) # Log each line to DEBUG + + # Log stderr + if result.stderr: + for line in result.stderr.splitlines(): + logging.debug(line) # Log each line to ERROR + + logging.info( + f"Bootloader command completed successfully for device {device}." + ) + + except subprocess.CalledProcessError as e: + logging.error( + f"Bootloader command failed for device {device}. Return code: {e.returncode}" + ) + if e.stdout: + for line in e.stdout.splitlines(): + logging.debug(line) # Log stdout from the exception + if e.stderr: + for line in e.stderr.splitlines(): + logging.error(line) # Log stderr from the exception + + except Exception as e: + logging.exception( + f"Unexpected error occurred while entering bootloader for device {device}: {e}" + ) + def menu(self) -> None: Utils.header() self.firmware.display_device() @@ -1496,21 +1602,7 @@ def flash_device(self, firmware_file: str, device: str): Utils.error_msg("Your device is not a valid Cartographer device.") self.menu() - # Prepend device path for Cartographer - device = f"/dev/serial/by-id/{device}" - - # Enter bootloader for the device - bootloader_cmd = [ - os.path.expanduser("~/klippy-env/bin/python"), - "-c", - f"import flash_usb as u; u.enter_bootloader('{device}')", - ] - _ = subprocess.run( - bootloader_cmd, - text=True, - check=True, - cwd=os.path.expanduser("~/klipper/scripts"), - ) + self.enter_katapult_bootloader(device) sleep(5) # Perform ls to find Katapult device @@ -1550,7 +1642,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1593,7 +1687,7 @@ def check_dfu_util(self) -> bool: if shutil.which("dfu-util"): return True else: - print("dfu-util is not installed. Please install it and try again.") + logging.info("dfu-util is not installed. Please install it and try again.") return False def dfu_loop(self) -> List[str]: @@ -1618,19 +1712,19 @@ def dfu_loop(self) -> List[str]: 5 ] # Assuming field 6 contains the ID detected_devices.append(device_id) # Add to the list - print(f"Detected DFU device: {device_id}") + logging.info(f"Detected DFU device: {device_id}") if detected_devices: return detected_devices # Exit both loops immediately - print("No DFU devices found. Retrying in 1 second...") + logging.info("No DFU devices found. Retrying in 1 second...") time.sleep(1) # Wait 1 second before retrying - print("No DFU devices found within the timeout period.") + logging.info("No DFU devices found within the timeout period.") except KeyboardInterrupt: print("\nQuery canceled by user.") return [] except Exception as e: - print(f"Error while querying devices: {e}") + logging.error(f"Error while querying devices: {e}") return [] # Return detected devices to avoid further processing if none found @@ -1770,7 +1864,9 @@ def flash_device(self, firmware_file: str, device: str): # Print stdout as it happens if process.stdout is not None: for line in process.stdout: - print(line.strip()) + line = line.strip() + logging.debug(line) # Log stdout + print(line) # Wait for the process to complete _ = process.wait() @@ -1824,27 +1920,27 @@ def __init__(self, firmware: Firmware, branch: str = "master", debug: bool = Fal def temp_dir_exists(self) -> Optional[str]: if self.debug: - print(f"Checking temporary directory: {self.temp_dir}") + logging.info(f"Checking temporary directory: {self.temp_dir}") if os.path.exists(self.temp_dir): if self.debug: - print(f"Directory exists: {self.temp_dir}") + logging.info(f"Directory exists: {self.temp_dir}") subdirs = [ os.path.join(self.temp_dir, d) for d in os.listdir(self.temp_dir) if os.path.isdir(os.path.join(self.temp_dir, d)) ] if self.debug: - print(f"Subdirectories found: {subdirs}") + logging.info(f"Subdirectories found: {subdirs}") if subdirs: return subdirs[0] if self.debug: - print("No subdirectories found.") + logging.info("No subdirectories found.") return None def clean_temp_dir(self): if os.path.exists(self.temp_dir): if self.debug: - print(f"Cleaning temporary directory: {self.temp_dir}") + logging.info(f"Cleaning temporary directory: {self.temp_dir}") shutil.rmtree(self.temp_dir) os.makedirs(self.temp_dir, exist_ok=True) @@ -1853,7 +1949,7 @@ def download_and_extract(self): # Define the path for the downloaded tarball tarball_path = os.path.join(self.temp_dir, "firmware.tar.gz") - print("Downloading tarball...") + logging.info("Downloading tarball...") # Use curl to save the tarball to a file with open(os.devnull, "w") as devnull: curl_command = [ @@ -1870,7 +1966,7 @@ def download_and_extract(self): check=True, ) - print("Extracting tarball...") + logging.info("Extracting tarball...") # Extract the tarball into the temporary directory with open(os.devnull, "w") as devnull: tar_command = ["tar", "-xz", "-C", self.temp_dir, "-f", tarball_path] @@ -1917,7 +2013,9 @@ def create_directory(self) -> bool: try: os.makedirs(KATAPULT_DIR) if args.debug: - print("Katapult directory created successfully.") + logging.info("Katapult directory created successfully.") + else: + logging.debug("Katapult directory created successfully.") except OSError as e: Utils.error_msg(f"Failed to create directory: {e}") return False @@ -1927,7 +2025,11 @@ def clone_repository(self) -> bool: git_dir = os.path.join(KATAPULT_DIR, ".git") if not os.path.exists(git_dir): if args.debug: - print( + logging.info( + "Directory exists but is not a Git repository. Cloning the repository..." + ) + else: + logging.debug( "Directory exists but is not a Git repository. Cloning the repository..." ) try: @@ -1941,7 +2043,9 @@ def clone_repository(self) -> bool: check=True, ) if args.debug: - print("Repository cloned successfully.") + logging.info("Repository cloned successfully.") + else: + logging.debug("Repository cloned successfully.") return True except subprocess.CalledProcessError as e: Utils.error_msg(f"Failed to clone repository: {e}") @@ -1983,13 +2087,19 @@ def check_and_update_repository(self) -> bool: if local_commit != remote_commit: if args.debug: - print("The repository is not up to date. Updating...") + logging.info("The repository is not up to date. Updating...") + else: + logging.debug("The repository is not up to date. Updating...") _ = subprocess.run(["git", "-C", KATAPULT_DIR, "pull"], check=True) if args.debug: - print("Repository updated successfully.") + logging.info("Repository updated successfully.") + else: + logging.debug("Repository updated successfully.") else: if args.debug: - print("The repository is up to date.") + logging.info("The repository is up to date.") + else: + logging.debug("The repository is up to date.") except subprocess.CalledProcessError as e: Utils.error_msg(f"Git update failed: {e}") return False @@ -2009,7 +2119,9 @@ def install(self) -> bool: return False if args.debug: - print("Katapult check passed.") + logging.info("Katapult check passed.") + else: + logging.debug("Katapult check passed.") return True @@ -2115,6 +2227,13 @@ def install(self) -> None: ) try: args = parser.parse_args(namespace=FirmwareNamespace()) + + Utils.configure_logging() + logging.debug( + "###################################################################################################" + ) + logging.info("Starting firmware flasher...") + logging.debug(f"Arguments: {vars(args)}") # Post-processing arguments # Ensure `args.flash` is a FlashMethod or None if isinstance(args.flash, str): # In case of any external assignment From 7c22c1c6f04db42ad4eb3081a2470cac2f956bc2 Mon Sep 17 00:00:00 2001 From: Chris Krause Date: Wed, 11 Dec 2024 13:12:01 +1100 Subject: [PATCH 2/5] remove clear handlers --- scripts/firmware.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scripts/firmware.py b/scripts/firmware.py index 020cfd1..f5ed1eb 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -110,10 +110,6 @@ def configure_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture all logs (DEBUG and above) - # Remove all existing handlers - if logger.hasHandlers(): - logger.handlers.clear() - # Create a console handler (always active) console_handler = logging.StreamHandler() console_handler.setLevel( From e9ba442bf6234856249c60719aa50a68d3236780 Mon Sep 17 00:00:00 2001 From: Chris Krause Date: Wed, 11 Dec 2024 13:13:05 +1100 Subject: [PATCH 3/5] fix color to error message --- scripts/firmware.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/firmware.py b/scripts/firmware.py index f5ed1eb..0311079 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -200,7 +200,7 @@ def colored_text(text: str, color: Color) -> str: @staticmethod def error_msg(message: str) -> None: colored_message = Utils.colored_text(f"Error: {message}", Color.RED) - logging.error(colored_message) + logging.error(message) print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) From d16103ff98226bba74bd24b50fa5f9dddc4ecddf Mon Sep 17 00:00:00 2001 From: Chris Krause Date: Wed, 11 Dec 2024 13:15:20 +1100 Subject: [PATCH 4/5] removed logging_debug --- scripts/firmware.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scripts/firmware.py b/scripts/firmware.py index 0311079..7561185 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -211,10 +211,6 @@ def success_msg(message: str) -> None: print(colored_message) _ = input(Utils.colored_text("\nPress Enter to continue...", Color.YELLOW)) - @staticmethod - def log_debug(message: str) -> None: - logging.debug(message) - @staticmethod def page(title: str, width: int = PAGE_WIDTH) -> None: if len(title) > width: From d142a0e76899622da237c5f87d09953ec9125001 Mon Sep 17 00:00:00 2001 From: Chris Krause Date: Wed, 11 Dec 2024 13:31:04 +1100 Subject: [PATCH 5/5] adds INFO only filter --- scripts/firmware.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/scripts/firmware.py b/scripts/firmware.py index 7561185..06e231c 100755 --- a/scripts/firmware.py +++ b/scripts/firmware.py @@ -110,11 +110,17 @@ def configure_logging(): logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture all logs (DEBUG and above) - # Create a console handler (always active) + # Create a console handler (only active for INFO level messages) console_handler = logging.StreamHandler() - console_handler.setLevel( - logging.INFO - ) # Show only info and above in the console + console_handler.setLevel(logging.DEBUG) # Set to DEBUG to allow filtering + + # Add a custom filter to only allow INFO messages + class InfoOnlyFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return record.levelno == logging.INFO + + console_handler.addFilter(InfoOnlyFilter()) # Apply the filter + console_formatter = logging.Formatter( "%(message)s" ) # Simple format for console