Skip to content

Commit

Permalink
Feat: dynamic frequency (#211)
Browse files Browse the repository at this point in the history
* Clean up this function a little

* Feat: Dynamic Freq Adjust

Sliding Window Implementation: Utilizes a fixed-size buffer (deque) to maintain the most recent frequency readings, enabling real-time calculation of the rolling average and standard deviation.

Dynamic Threshold Calculation: Sets the threshold at the rolling average plus three standard deviations (μ + 3σ), allowing the system to adapt to normal operational fluctuations without manual parameter tuning.

Minimum Threshold Establishment: Introduces a baseline threshold set to 20% above the initial frequency reading to ensure stability during initialization and prevent premature anomaly detection.

Enhanced Logging: Adds detailed debug logs to track frequency statistics and threshold values, facilitating easier monitoring and troubleshooting.

* Update mcu.pyi

* Update mcu.pyi
  • Loading branch information
sbtoonz authored Dec 24, 2024
1 parent 0925988 commit edda8ca
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
76 changes: 66 additions & 10 deletions scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import msgproto
import numpy as np
import pins
from collections import deque
from clocksync import SecondarySync
from configfile import ConfigWrapper
from gcode import GCodeCommand
Expand Down Expand Up @@ -60,6 +61,10 @@
THRESHOLD_STEP_MULTIPLIER = 10
# Require a qualified threshold to pass at 0.66 of the QUALIFY_SAMPLES
THRESHOLD_ACCEPTANCE_FACTOR = 0.66
SLIDING_WINDOW_SIZE = 50
SIGMA_MULTIPLIER = 3
THRESHOLD_MULTIPLIER = 1.2
SHORTED_COIL_VALUE = 0xFFFFFFF


class TriggerMethod(IntEnum):
Expand Down Expand Up @@ -1714,23 +1719,74 @@ def _is_faulty_coordinate(self, x, y, add_offsets=False):
# Streaming mode

def _check_hardware(self, sample):
# Validate sample input
if "data" not in sample or "freq" not in sample:
raise self._mcu.error("Sample must contain 'data' and 'freq' keys.")

# Initialize variables on the first call
if not hasattr(self, "freq_window"):
self.freq_window = deque(maxlen=SLIDING_WINDOW_SIZE) # Sliding window
self.min_threshold = None # Minimum frequency threshold

# Add the current frequency to the sliding window
freq = sample["freq"]
self.freq_window.append(freq)

# Calculate statistics from the sliding window
if len(self.freq_window) > 1:
freq_window_array = np.array(self.freq_window) # Convert deque to numpy array
f_avg = np.mean(freq_window_array)
f_std = np.std(freq_window_array)
dynamic_threshold = f_avg + SIGMA_MULTIPLIER * f_std # Dynamic threshold
else:
# Fallback during initialization
f_avg = freq
f_std = 0
dynamic_threshold = freq * THRESHOLD_MULTIPLIER # Fallback threshold


# Ensure a minimum threshold is set
if self.min_threshold is None:
self.min_threshold = freq * THRESHOLD_MULTIPLIER # Initial minimum threshold

# Final threshold (whichever is greater: dynamic or minimum)
final_threshold = max(dynamic_threshold, self.min_threshold)

# Debug log for threshold values
logging.debug(
f"Sliding Window Threshold Debug: freq={freq}, f_avg={f_avg}, "
f"f_std={f_std}, dynamic_threshold={dynamic_threshold}, "
f"min_threshold={self.min_threshold}, final_threshold={final_threshold}"
)

# Check for hardware issues
if not self.hardware_failure:
msg = None
if sample["data"] == 0xFFFFFFF:
msg = "coil is shorted or not connected"
elif self.fmin is not None and sample["freq"] > 1.35 * self.fmin:
msg = "coil expected max frequency exceeded"

if sample["data"] == SHORTED_COIL_VALUE:
msg = "Coil is shorted or not connected."
logging.debug(f"Debug: data={sample['data']} indicates connection issue.")
elif freq > final_threshold:
msg = "Coil expected max frequency exceeded (sliding window)."
logging.debug(
f"Frequency {freq} exceeded final threshold {final_threshold}."
)

if msg:
msg = "Scanner hardware issue: " + msg
self.hardware_failure = msg
logging.error(msg)
# Log and handle hardware failure
full_msg = f"Scanner hardware issue: {msg}"
self.hardware_failure = full_msg
logging.error(full_msg)

if self._stream_en:
self.printer.invoke_shutdown(msg)
self.printer.invoke_shutdown(full_msg)
else:
self.gcode.respond_raw("!! " + msg + "\n")
self.gcode.respond_raw(f"!! {full_msg}\n")
elif self._stream_en:
# Handle already detected hardware failure
self.printer.invoke_shutdown(self.hardware_failure)


def _enrich_sample_time(self, sample):
clock = sample["clock"] = self._mcu.clock32_to_clock64(sample["clock"])
sample["time"] = self._mcu.clock_to_print_time(clock)
Expand Down Expand Up @@ -3846,4 +3902,4 @@ def load_config_prefix(config: ConfigWrapper):
scanner.register_model(name, model)
return model
else:
raise config.error("Unknown scanner config directive '%s'" % (name[7:],))
raise config.error("Unknown scanner config directive '%s'" % (name[7:],))
5 changes: 5 additions & 0 deletions typings/mcu.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class MCUStatus(TypedDict):
class _CommandQueue:
pass

class error(Exception):
pass

class MCU:
_mcu_freq: float
_clocksync: ClockSync
Expand Down Expand Up @@ -59,6 +62,8 @@ class MCU:
pass
def is_fileoutput(self) -> bool:
pass
def error(self, message: str) -> string:
pass

@overload
def get_constant(self, name: str, default: type[sentinel] | str = sentinel) -> str:
Expand Down

0 comments on commit edda8ca

Please sign in to comment.