diff --git a/.github/workflows/release_linux_modern.yml b/.github/workflows/release_linux_modern.yml index a30072e5..483f8d8d 100644 --- a/.github/workflows/release_linux_modern.yml +++ b/.github/workflows/release_linux_modern.yml @@ -1,4 +1,4 @@ -name: Linux Release +name: Modern Linux Release on: push: @@ -33,5 +33,5 @@ jobs: - name: Archive production artifacts uses: actions/upload-artifact@v1 with: - name: NanoVNASaver.linux + name: NanoVNASaver.linux_modern path: dist/nanovna-saver diff --git a/NanoVNASaver/Analysis/PeakSearchAnalysis.py b/NanoVNASaver/Analysis/PeakSearchAnalysis.py index 8d3d91e1..7f23aa4a 100644 --- a/NanoVNASaver/Analysis/PeakSearchAnalysis.py +++ b/NanoVNASaver/Analysis/PeakSearchAnalysis.py @@ -19,8 +19,8 @@ import logging from PyQt5 import QtWidgets -import scipy import numpy as np +from scipy.signal import find_peaks, peak_prominences from NanoVNASaver.Analysis.Base import QHLine from NanoVNASaver.Analysis.SimplePeakSearchAnalysis import ( @@ -60,17 +60,17 @@ def runAnalysis(self): inverted = False if self.button['peak_l'].isChecked(): inverted = True - peaks, _ = scipy.signal.find_peaks( + peaks, _ = find_peaks( -np.array(data), width=3, distance=3, prominence=1) else: self.button['peak_h'].setChecked(True) - peaks, _ = scipy.signal.find_peaks( + peaks, _ = find_peaks( data, width=3, distance=3, prominence=1) # Having found the peaks, get the prominence data for i, p in np.ndenumerate(peaks): logger.debug("Peak %i at %d", i, p) - prominences = scipy.signal.peak_prominences(data, peaks)[0] + prominences = peak_prominences(data, peaks)[0] logger.debug("%d prominences", len(prominences)) # Find the peaks with the most extreme values diff --git a/NanoVNASaver/AnalyticTools.py b/NanoVNASaver/AnalyticTools.py index c85b4f41..34b71385 100644 --- a/NanoVNASaver/AnalyticTools.py +++ b/NanoVNASaver/AnalyticTools.py @@ -21,7 +21,7 @@ from typing import Callable, List, Tuple import numpy as np -import scipy +from scipy.signal import find_peaks from NanoVNASaver.RFTools import Datapoint @@ -60,7 +60,7 @@ def maxima(data: List[float], threshold: float = 0.0) -> List[int]: Returns: List[int]: indices of maxima """ - peaks = scipy.signal.find_peaks( + peaks = find_peaks( data, width=2, distance=3, prominence=1)[0].tolist() return [ i for i in peaks if data[i] > threshold @@ -76,7 +76,7 @@ def minima(data: List[float], threshold: float = 0.0) -> List[int]: Returns: List[int]: indices of minima """ - bottoms = scipy.signal.find_peaks( + bottoms = find_peaks( -np.array(data), width=2, distance=3, prominence=1)[0].tolist() return [ i for i in bottoms if data[i] < threshold diff --git a/NanoVNASaver/Calibration.py b/NanoVNASaver/Calibration.py index 51002107..beb35c93 100644 --- a/NanoVNASaver/Calibration.py +++ b/NanoVNASaver/Calibration.py @@ -22,21 +22,51 @@ import os import re from collections import defaultdict, UserDict +from dataclasses import dataclass from typing import List from scipy.interpolate import interp1d from NanoVNASaver.RFTools import Datapoint -RXP_CAL_LINE = re.compile(r"""^\s* - (?P\d+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+)(?: \s - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ - (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) - )? + +IDEAL_SHORT = complex(-1, 0) +IDEAL_OPEN = complex(1, 0) +IDEAL_LOAD = complex(0, 0) +IDEAL_THROUGH = complex(1, 0) + +RXP_CAL_LINE = { + "short": re.compile(r""" + ^ \s* + (?P\d+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) + ( \s+ # optional for backword compatibility + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) + )? \s* $ + """, re.VERBOSE), + "long": re.compile(r""" + ^ \s* + (?P\d+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) \s+ + (?P[-0-9Ee.]+) \s+ (?P[-0-9Ee.]+) + \s* $ + """, re.VERBOSE), +} + + +RXP_CAL_HEADER = re.compile(r""" + ^ \# \s+ Hz \s+ + ShortR \s+ ShortI \s+ OpenR \s+ OpenI \s+ + LoadR \s+ LoadI \s+ ThroughR \s+ ThroughI \s+ + (?PThrureflR \s+ ThrureflI \s+)? IsolationR \s+ IsolationI \s* + $ """, re.VERBOSE) logger = logging.getLogger(__name__) @@ -49,57 +79,84 @@ def correct_delay(d: Datapoint, delay: float, reflect: bool = False): return Datapoint(d.freq, corr_data.real, corr_data.imag) -class CalData(UserDict): - def __init__(self): - data = { - "short": None, - "open": None, - "load": None, - "through": None, - "thrurefl": None, - "isolation": None, - # the frequence - "freq": 0, - # 1 Port - "e00": 0.0, # Directivity - "e11": 0.0, # Port1 match - "delta_e": 0.0, # Tracking - "e10e01": 0.0, # Forward Reflection Tracking - # 2 port - "e30": 0.0, # Forward isolation - "e22": 0.0, # Port2 match - "e10e32": 0.0, # Forward transmission - } - super().__init__(data) +@dataclass +class CalData: + # pylint: disable=too-many-instance-attributes + short: complex = complex(0.0, 0.0) + open: complex = complex(0.0, 0.0) + load: complex = complex(0.0, 0.0) + through: complex = complex(0.0, 0.0) + thrurefl: complex = complex(0.0, 0.0) + isolation: complex = complex(0.0, 0.0) + freq: int = 0 + e00: float = 0.0 # Directivity + e11: float = 0.0 # Port1 match + delta_e: float = 0.0 # Tracking + e10e01: float = 0.0 # Forward Reflection Tracking + # 2 port + e30: float = 0.0 # Forward isolation + e22: float = 0.0 # Port2 match + e10e32: float = 0.0 # Forward transmission def __str__(self): - d = self.data - s = (f'{d["freq"]}' - f' {d["short"].re} {d["short"].im}' - f' {d["open"].re} {d["open"].im}' - f' {d["load"].re} {d["load"].im}') - if d["through"] is not None: - s += (f' {d["through"].re} {d["through"].im}' - f' {d["thrurefl"].re} {d["thrurefl"].im}' - f' {d["isolation"].re} {d["isolation"].im}') - return s - - -class CalDataSet: + return ( + f'{self.freq}' + f' {self.short.real} {self.short.imag}' + f' {self.open.real} {self.open.imag}' + f' {self.load.real} {self.load.imag}' + ( + f' {self.through.real} {self.through.imag}' + f' {self.thrurefl.real} {self.thrurefl.imag}' + f' {self.isolation.real} {self.isolation.imag}' + if self.through else '' + ) + ) + + +@dataclass +class CalElement: + # pylint: disable=too-many-instance-attributes + short_is_ideal: bool = True + short_l0: float = 5.7e-12 + short_l1: float = -8.96e-20 + short_l2: float = -1.1e-29 + short_l3: float = -4.12e-37 + short_length: float = -34.2 # ps + + open_is_ideal: bool = True + open_c0: float = 2.1e-14 + open_c1: float = 5.67e-23 + open_c2: float = -2.39e-31 + open_c3: float = 2.0e-40 + open_length: float = 0.0 + + load_is_ideal: bool = True + load_r: float = 50.0 + load_l: float = 0.0 + load_c: float = 0.0 + load_length: float = 0.0 + + through_is_ideal: bool = True + through_length: float = 0.0 + + +class CalDataSet(UserDict): def __init__(self): - self.data = defaultdict(CalData) + super().__init__() + self.data: defaultdict[int, CalData] = defaultdict(CalData) def insert(self, name: str, dp: Datapoint): - if name not in self.data[dp.freq]: + if name not in {'short', 'open', 'load', + 'through', 'thrurefl', 'isolation'}: raise KeyError(name) - self.data[dp.freq]["freq"] = dp.freq - self.data[dp.freq][name] = dp + freq = dp.freq + setattr(self.data[freq], name, (dp.z)) + self.data[freq].freq = freq def frequencies(self) -> List[int]: return sorted(self.data.keys()) - def get(self, freq: int) -> CalData: - return self.data[freq] + def get(self, key: int, default: CalData = None) -> CalData: + return self.data.get(key, default) def items(self): yield from self.data.items() @@ -109,63 +166,32 @@ def values(self): yield self.get(freq) def size_of(self, name: str) -> int: - return len([v for v in self.data.values() if v[name] is not None]) + return len( + [True for val in self.data.values() if getattr(val, name)] + ) def complete1port(self) -> bool: for val in self.data.values(): - for name in ("short", "open", "load"): - if val[name] is None: - return False + if not all((val.short, val.open, val.load)): + return False return any(self.data) def complete2port(self) -> bool: + if not self.complete1port(): + return False for val in self.data.values(): - for name in ("short", "open", "load", "through", "thrurefl", - "isolation"): - if val[name] is None: - return False + if not all((val.through, val.thrurefl, val.isolation)): + return False return any(self.data) class Calibration: - CAL_NAMES = ("short", "open", "load", "through", "thrurefl", "isolation",) - IDEAL_SHORT = complex(-1, 0) - IDEAL_OPEN = complex(1, 0) - IDEAL_LOAD = complex(0, 0) - def __init__(self): self.notes = [] self.dataset = CalDataSet() + self.cal_element = CalElement() self.interp = {} - - self.useIdealShort = True - self.shortL0 = 5.7 * 10E-12 - self.shortL1 = -8960 * 10E-24 - self.shortL2 = -1100 * 10E-33 - self.shortL3 = -41200 * 10E-42 - self.shortLength = -34.2 # Picoseconfrequenciesds - # These numbers look very large, considering what Keysight - # suggests their numbers are. - - self.useIdealOpen = True - # Subtract 50fF for the nanoVNA calibration if nanoVNA is - # calibrated? - self.openC0 = 2.1 * 10E-14 - self.openC1 = 5.67 * 10E-23 - self.openC2 = -2.39 * 10E-31 - self.openC3 = 2.0 * 10E-40 - self.openLength = 0 - - self.useIdealLoad = True - self.loadR = 25 - self.loadL = 0 - self.loadC = 0 - self.loadLength = 0 - - self.useIdealThrough = True - self.throughLength = 0 - self.isCalculated = False self.source = "Manual" @@ -191,37 +217,37 @@ def _calc_port_1(self, freq: int, cal: CalData): g2 = self.gamma_open(freq) g3 = self.gamma_load(freq) - gm1 = cal["short"].z - gm2 = cal["open"].z - gm3 = cal["load"].z + gm1 = cal.short + gm2 = cal.open + gm3 = cal.load denominator = (g1 * (g2 - g3) * gm1 + g2 * g3 * gm2 - g2 * g3 * gm3 - (g2 * gm2 - g3 * gm3) * g1) - cal["e00"] = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 - - (g2 * g3 * gm2 - g2 * g3 * gm3 - + cal.e00 = - ((g2 * gm3 - g3 * gm3) * g1 * gm2 - + (g2 * g3 * gm2 - g2 * g3 * gm3 - (g3 * gm2 - g2 * gm3) * g1) * gm1 - ) / denominator - cal["e11"] = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) + - g3 * gm2 - g2 * gm3) / denominator - cal["delta_e"] = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 * - gm3) * gm1 + (g2 * gm3 - g3 * gm3) * - gm2) / denominator + ) / denominator + cal.e11 = ((g2 - g3) * gm1 - g1 * (gm2 - gm3) + + g3 * gm2 - g2 * gm3) / denominator + cal.delta_e = - ((g1 * (gm2 - gm3) - g2 * gm2 + g3 * + gm3) * gm1 + (g2 * gm3 - g3 * gm3) * + gm2) / denominator def _calc_port_2(self, freq: int, cal: CalData): gt = self.gamma_through(freq) - gm4 = cal["through"].z - gm5 = cal["thrurefl"].z - gm6 = cal["isolation"].z - gm7 = gm5 - cal["e00"] + gm4 = cal.through + gm5 = cal.thrurefl + gm6 = cal.isolation + gm7 = gm5 - cal.e00 - cal["e30"] = cal["isolation"].z - cal["e10e01"] = cal["e00"] * cal["e11"] - cal["delta_e"] - cal["e22"] = gm7 / ( - gm7 * cal["e11"] * gt ** 2 + cal["e10e01"] * gt ** 2) - cal["e10e32"] = (gm4 - gm6) * ( - 1 - cal["e11"] * cal["e22"] * gt ** 2) / gt + cal.e30 = cal.isolation + cal.e10e01 = cal.e00 * cal.e11 - cal.delta_e + cal.e22 = gm7 / ( + gm7 * cal.e11 * gt ** 2 + cal.e10e01 * gt ** 2) + cal.e10e32 = (gm4 - gm6) * ( + 1 - cal.e11 * cal.e22 * gt ** 2) / gt def calc_corrections(self): if not self.isValid1Port(): @@ -251,69 +277,57 @@ def calc_corrections(self): logger.debug("Calibration correctly calculated.") def gamma_short(self, freq: int) -> complex: - g = Calibration.IDEAL_SHORT - if not self.useIdealShort: - logger.debug("Using short calibration set values.") - Zsp = complex(0, 2 * math.pi * freq * ( - self.shortL0 + self.shortL1 * freq + - self.shortL2 * freq ** 2 + self.shortL3 * freq ** 3)) - # Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21) - g = (Zsp / 50 - 1) / (Zsp / 50 + 1) * cmath.exp( - complex(0, 2 * math.pi * 2 * freq * self.shortLength * -1)) - return g + if self.cal_element.short_is_ideal: + return IDEAL_SHORT + logger.debug("Using short calibration set values.") + cal_element = self.cal_element + Zsp = complex(0.0, 2.0 * math.pi * freq * ( + cal_element.short_l0 + cal_element.short_l1 * freq + + cal_element.short_l2 * freq**2 + cal_element.short_l3 * freq**3)) + # Referencing https://arxiv.org/pdf/1606.02446.pdf (18) - (21) + return (Zsp / 50.0 - 1.0) / (Zsp / 50.0 + 1.0) * cmath.exp( + complex(0.0, + -4.0 * math.pi * freq * cal_element.short_length)) def gamma_open(self, freq: int) -> complex: - g = Calibration.IDEAL_OPEN - if not self.useIdealOpen: - logger.debug("Using open calibration set values.") - Zop = complex(0, 2 * math.pi * freq * ( - self.openC0 + self.openC1 * freq + - self.openC2 * freq ** 2 + self.openC3 * freq ** 3)) - g = ((1 - 50 * Zop) / (1 + 50 * Zop)) * cmath.exp( - complex(0, 2 * math.pi * 2 * freq * self.openLength * -1)) - return g + if self.cal_element.open_is_ideal: + return IDEAL_OPEN + logger.debug("Using open calibration set values.") + cal_element = self.cal_element + Zop = complex(0.0, 2.0 * math.pi * freq * ( + cal_element.open_c0 + cal_element.open_c1 * freq + + cal_element.open_c2 * freq**2 + cal_element.open_c3 * freq**3)) + return ((1.0 - 50.0 * Zop) / (1.0 + 50.0 * Zop)) * cmath.exp( + complex(0.0, + -4.0 * math.pi * freq * cal_element.open_length)) def gamma_load(self, freq: int) -> complex: - g = Calibration.IDEAL_LOAD - if not self.useIdealLoad: - logger.debug("Using load calibration set values.") - Zl = complex(self.loadR, 0) - if self.loadC > 0: - Zl = self.loadR / \ - complex(1, 2 * self.loadR * math.pi * freq * self.loadC) - if self.loadL > 0: - Zl = Zl + complex(0, 2 * math.pi * freq * self.loadL) - g = (Zl / 50 - 1) / (Zl / 50 + 1) * cmath.exp( - complex(0, 2 * math.pi * 2 * freq * self.loadLength * -1)) - return g + if self.cal_element.load_is_ideal: + return IDEAL_LOAD + logger.debug("Using load calibration set values.") + cal_element = self.cal_element + Zl = complex(cal_element.load_r, 0.0) + if cal_element.load_c > 0.0: + Zl = cal_element.load_r / complex( + 1.0, + 2.0 * cal_element.load_r * math.pi * freq * cal_element.load_c) + if cal_element.load_l > 0.0: + Zl = Zl + complex(0.0, 2 * math.pi * freq * cal_element.load_l) + return (Zl / 50.0 - 1.0) / (Zl / 50.0 + 1.0) * cmath.exp( + complex(0.0, -4 * math.pi * freq * cal_element.load_length)) def gamma_through(self, freq: int) -> complex: - g = complex(1, 0) - if not self.useIdealThrough: - logger.debug("Using through calibration set values.") - g = cmath.exp(complex(0, 1) * 2 * math.pi * - self.throughLength * freq * -1) - return g + if self.cal_element.through_is_ideal: + return IDEAL_THROUGH + logger.debug("Using through calibration set values.") + cal_element = self.cal_element + return cmath.exp( + complex(0.0, -2.0 * math.pi * cal_element.through_length * freq)) def gen_interpolation(self): - freq = [] - e00 = [] - e11 = [] - delta_e = [] - e10e01 = [] - e30 = [] - e22 = [] - e10e32 = [] - - for caldata in self.dataset.values(): - freq.append(caldata["freq"]) - e00.append(caldata["e00"]) - e11.append(caldata["e11"]) - delta_e.append(caldata["delta_e"]) - e10e01.append(caldata["e10e01"]) - e30.append(caldata["e30"]) - e22.append(caldata["e22"]) - e10e32.append(caldata["e10e32"]) + (freq, e00, e11, delta_e, e10e01, e30, e22, e10e32) = zip(*[ + (c.freq, c.e00, c.e11, c.delta_e, c.e10e01, c.e30, c.e22, c.e10e32) + for c in self.dataset.values()]) self.interp = { "e00": interp1d(freq, e00, @@ -349,14 +363,14 @@ def correct21(self, dp: Datapoint, dp11: Datapoint): i = self.interp s21 = (dp.z - i["e30"](dp.freq)) / i["e10e32"](dp.freq) s21 = s21 * (i["e10e01"](dp.freq) / (i["e11"](dp.freq) - * dp11.z - i["delta_e"](dp.freq))) + * dp11.z - i["delta_e"](dp.freq))) return Datapoint(dp.freq, s21.real, s21.imag) # TODO: implement tests def save(self, filename: str): # Save the calibration data to file if not self.isValid1Port(): - raise ValueError("Not a valid 1-Port calibration") + raise ValueError("Not a valid calibration") with open(filename, mode="w", encoding='utf-8') as calfile: calfile.write("# Calibration data for NanoVNA-Saver\n") for note in self.notes: @@ -369,13 +383,21 @@ def save(self, filename: str): calfile.write(f"{self.dataset.get(freq)}\n") # TODO: implement tests - # TODO: Exception should be catched by caller def load(self, filename): self.source = os.path.basename(filename) self.dataset = CalDataSet() self.notes = [] - parsed_header = False + header = "" + cols = { + "": (), + "sol": ("short", "open", "load"), + "short": ("short", "open", "load", + "through", "isolation"), + "long": ("short", "open", "load", + "through", "thrurefl", "isolation"), + + } with open(filename, encoding='utf-8') as calfile: for i, line in enumerate(calfile): line = line.strip() @@ -383,26 +405,29 @@ def load(self, filename): note = line[2:] self.notes.append(note) continue + if m := RXP_CAL_HEADER.search(line): + header = "long" if m.group(1) else "short" + columns = cols[header] + logger.debug("found %s header type", header) + continue if line.startswith("#"): - if not parsed_header and line == ( - "# Hz ShortR ShortI OpenR OpenI LoadR LoadI" - " ThroughR ThroughI ThrureflR ThrureflI" - " IsolationR IsolationI"): - parsed_header = True continue - if not parsed_header: + if not header: logger.warning( "Warning: Read line without having read header: %s", line) continue - - m = RXP_CAL_LINE.search(line) + m = RXP_CAL_LINE[header].search(line) if not m: - logger.warning("Illegal data in cal file. Line %i", i) + logger.warning("Illegal data in cal file. Line %i", i + 1) + continue + if (header == "short" and not m.group(8) and + columns != cols["sol"]): + logger.debug("only SOL cal data") + columns = cols["sol"] cal = m.groupdict() - nr_cals = 6 if cal["throughr"] else 3 - for name in Calibration.CAL_NAMES[:nr_cals]: + for name in columns: self.dataset.insert( name, Datapoint(int(cal["freq"]), diff --git a/NanoVNASaver/Charts/Chart.py b/NanoVNASaver/Charts/Chart.py index e3366816..b88f8a85 100644 --- a/NanoVNASaver/Charts/Chart.py +++ b/NanoVNASaver/Charts/Chart.py @@ -26,7 +26,7 @@ from NanoVNASaver import Defaults from NanoVNASaver.RFTools import Datapoint -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Charts/Permeability.py b/NanoVNASaver/Charts/Permeability.py index f4c975c4..d8cbc4c7 100644 --- a/NanoVNASaver/Charts/Permeability.py +++ b/NanoVNASaver/Charts/Permeability.py @@ -22,7 +22,7 @@ from PyQt5 import QtGui -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker from NanoVNASaver.RFTools import Datapoint from NanoVNASaver.SITools import Format, Value from NanoVNASaver.Charts.Chart import Chart diff --git a/NanoVNASaver/Charts/RI.py b/NanoVNASaver/Charts/RI.py index 0b58d2d5..102f51fc 100644 --- a/NanoVNASaver/Charts/RI.py +++ b/NanoVNASaver/Charts/RI.py @@ -23,7 +23,7 @@ from PyQt5 import QtWidgets, QtGui from NanoVNASaver.Formatting import format_frequency_chart -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker from NanoVNASaver.RFTools import Datapoint from NanoVNASaver.SITools import Format, Value diff --git a/NanoVNASaver/Controls/MarkerControl.py b/NanoVNASaver/Controls/MarkerControl.py index 3f31c4b9..54d18f2b 100644 --- a/NanoVNASaver/Controls/MarkerControl.py +++ b/NanoVNASaver/Controls/MarkerControl.py @@ -22,7 +22,7 @@ from PyQt5.QtWidgets import QCheckBox from NanoVNASaver import Defaults -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker from NanoVNASaver.Controls.Control import Control logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Hardware/Hardware.py b/NanoVNASaver/Hardware/Hardware.py index 67d91a55..5796a76b 100644 --- a/NanoVNASaver/Hardware/Hardware.py +++ b/NanoVNASaver/Hardware/Hardware.py @@ -24,6 +24,7 @@ import serial from serial.tools import list_ports +from serial.tools.list_ports_common import ListPortInfo from NanoVNASaver.Hardware.VNA import VNA from NanoVNASaver.Hardware.AVNA import AVNA @@ -74,29 +75,50 @@ def _fix_v2_hwinfo(dev): return dev +def usb_typename(device: ListPortInfo) -> str: + return next((t.name for t in USBDEVICETYPES if + device.vid == t.vid and device.pid == t.pid), + "") + # Get list of interfaces with VNAs connected + + def get_interfaces() -> List[Interface]: interfaces = [] # serial like usb interfaces for d in list_ports.comports(): if platform.system() == 'Windows' and d.vid is None: d = _fix_v2_hwinfo(d) - for t in USBDEVICETYPES: - if d.vid != t.vid or d.pid != t.pid: - continue - logger.debug("Found %s USB:(%04x:%04x) on port %s", - t.name, d.vid, d.pid, d.device) - iface = Interface('serial', t.name) - iface.port = d.device - iface.open() - iface.comment = get_comment(iface) - iface.close() - interfaces.append(iface) + if not (typename := usb_typename(d)): + continue + logger.debug("Found %s USB:(%04x:%04x) on port %s", + typename, d.vid, d.pid, d.device) + iface = Interface('serial', typename) + iface.port = d.device + iface.open() + iface.comment = get_comment(iface) + iface.close() + interfaces.append(iface) logger.debug("Interfaces: %s", interfaces) return interfaces +def get_portinfos() -> List[str]: + portinfos = [] + # serial like usb interfaces + for d in list_ports.comports(): + logger.debug("Found USB:(%04x:%04x) on port %s", + d.vid, d.pid, d.device) + iface = Interface('serial', "DEBUG") + iface.port = d.device + iface.open() + version = detect_version(iface) + iface.close() + portinfos.append(version) + return portinfos + + def get_VNA(iface: Interface) -> VNA: # serial_port.timeout = TIMEOUT return NAME2DEVICE[iface.comment](iface) diff --git a/NanoVNASaver/Marker/Delta.py b/NanoVNASaver/Marker/Delta.py index 2d9b7129..ba76093a 100644 --- a/NanoVNASaver/Marker/Delta.py +++ b/NanoVNASaver/Marker/Delta.py @@ -35,7 +35,7 @@ format_wavelength, ) -from .Widget import Marker +from NanoVNASaver.Marker.Widget import Marker class DeltaMarker(Marker): diff --git a/NanoVNASaver/Marker/__init__.py b/NanoVNASaver/Marker/__init__.py index bb96c2d9..e69de29b 100644 --- a/NanoVNASaver/Marker/__init__.py +++ b/NanoVNASaver/Marker/__init__.py @@ -1,3 +0,0 @@ -from .Widget import Marker -from .Delta import DeltaMarker -from .Values import Value, default_label_ids diff --git a/NanoVNASaver/NanoVNASaver.py b/NanoVNASaver/NanoVNASaver.py index 8182d812..c47650ad 100644 --- a/NanoVNASaver/NanoVNASaver.py +++ b/NanoVNASaver/NanoVNASaver.py @@ -16,6 +16,7 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import contextlib import logging import sys import threading @@ -46,9 +47,11 @@ SmithChart, SParameterChart, TDRChart, ) from .Calibration import Calibration -from .Marker import Marker, DeltaMarker +from .Marker.Widget import Marker +from .Marker.Delta import DeltaMarker from .SweepWorker import SweepWorker -from .Settings import BandsModel, Sweep +from .Settings.Bands import BandsModel +from .Settings.Sweep import Sweep from .Touchstone import Touchstone from .About import VERSION @@ -100,10 +103,7 @@ def __init__(self): self.bands = BandsModel() self.interface = Interface("serial", "None") - try: - self.vna = VNA(self.interface) - except IOError as exc: - self.showError(f"{exc}\n\nPlease try reconnect") + self.vna = VNA(self.interface) self.dataLock = threading.Lock() self.data = Touchstone() @@ -490,10 +490,8 @@ def markerUpdated(self, marker: Marker): else: self.delta_marker.set_markers(m1, m2) self.delta_marker.resetLabels() - try: + with contextlib.suppress(IndexError): self.delta_marker.updateLabels() - except IndexError: - pass def dataUpdated(self): with self.dataLock: @@ -571,11 +569,7 @@ def setReference(self, s11=None, s21=None, source=None): self.btnResetReference.setDisabled(False) - if source is not None: - # Save the reference source info - self.referenceSource = source - else: - self.referenceSource = self.sweepSource + self.referenceSource = source or self.sweepSource self.updateTitle() def updateTitle(self): @@ -589,7 +583,7 @@ def updateTitle(self): f"Reference: {self.referenceSource} @" f" {len(self.ref_data.s11)} points") insert += ")" - title = f"{self.baseTitle} {insert if insert else ''}" + title = f"{self.baseTitle} {insert or ''}" self.setWindowTitle(title) def resetReference(self): @@ -612,11 +606,9 @@ def showError(self, text): def showSweepError(self): self.showError(self.worker.error_message) - try: + with contextlib.suppress(IOError): self.vna.flushSerialBuffers() # Remove any left-over data - self.vna.reconnect() # try reconnection - except IOError: - pass + self.vna.reconnect() # try reconnection self.sweepFinished() def popoutChart(self, chart: Chart): diff --git a/NanoVNASaver/SITools.py b/NanoVNASaver/SITools.py index 686a0b4f..c3262b1b 100644 --- a/NanoVNASaver/SITools.py +++ b/NanoVNASaver/SITools.py @@ -84,7 +84,7 @@ def __repr__(self) -> str: def __str__(self) -> str: fmt = self.fmt if math.isnan(self._value): - return (f"-{fmt.space_str}{self._unit}") + return f"-{fmt.space_str}{self._unit}" if (fmt.assume_infinity and abs(self._value) >= 10 ** ((fmt.max_offset + 1) * 3)): return (("-" if self._value < 0 else "") + diff --git a/NanoVNASaver/Settings/__init__.py b/NanoVNASaver/Settings/__init__.py index 38cacea0..e69de29b 100644 --- a/NanoVNASaver/Settings/__init__.py +++ b/NanoVNASaver/Settings/__init__.py @@ -1,2 +0,0 @@ -from .Bands import BandsModel -from .Sweep import Sweep diff --git a/NanoVNASaver/SweepWorker.py b/NanoVNASaver/SweepWorker.py index e2ac7b4a..d43e3464 100644 --- a/NanoVNASaver/SweepWorker.py +++ b/NanoVNASaver/SweepWorker.py @@ -74,57 +74,34 @@ def __init__(self, app: QtWidgets.QWidget): self.offsetDelay = 0 @pyqtSlot() - def run(self): + def run(self) -> None: try: self._run() except BaseException as exc: # pylint: disable=broad-except logger.exception("%s", exc) self.gui_error(f"ERROR during sweep\n\nStopped\n\n{exc}") - return - # raise exc + if logger.isEnabledFor(logging.DEBUG): + raise exc - def _run(self): + def _run(self) -> None: logger.info("Initializing SweepWorker") - self.running = True - self.percentage = 0 - if not self.app.vna.connected(): logger.debug( "Attempted to run without being connected to the NanoVNA") self.running = False return + self.running = True + self.percentage = 0 + with self.app.sweep.lock: sweep = self.app.sweep.copy() - averages = 1 - if sweep.properties.mode == SweepMode.AVERAGE: - averages = sweep.properties.averages[0] - logger.info("%d averages", averages) - if sweep != self.sweep: # parameters changed self.sweep = sweep self.init_data() - while True: - for i in range(sweep.segments): - logger.debug("Sweep segment no %d", i) - if self.stopped: - logger.debug("Stopping sweeping as signalled") - break - start, stop = sweep.get_index_range(i) - - try: - freq, values11, values21 = self.readAveragedSegment( - start, stop, averages) - self.percentage = (i + 1) * 100 / sweep.segments - self.updateData(freq, values11, values21, i) - except ValueError as e: - self.gui_error(str(e)) - else: - if sweep.properties.mode == SweepMode.CONTINOUS: - continue - break + self._run_loop() if sweep.segments > 1: start = sweep.start @@ -138,6 +115,28 @@ def _run(self): self.signals.finished.emit() self.running = False + def _run_loop(self) -> None: + sweep = self.sweep + averages = (sweep.properties.averages[0] + if sweep.properties.mode == SweepMode.AVERAGE + else 1) + logger.info("%d averages", averages) + + while True: + for i in range(sweep.segments): + logger.debug("Sweep segment no %d", i) + if self.stopped: + logger.debug("Stopping sweeping as signalled") + break + start, stop = sweep.get_index_range(i) + + freq, values11, values21 = self.readAveragedSegment( + start, stop, averages) + self.percentage = (i + 1) * 100 / sweep.segments + self.updateData(freq, values11, values21, i) + if sweep.properties.mode != SweepMode.CONTINOUS: + break + def init_data(self): self.data11 = [] self.data21 = [] @@ -156,16 +155,11 @@ def updateData(self, frequencies, values11, values21, index): "Calculating data and inserting in existing data at index %d", index) offset = self.sweep.points * index - v11 = values11[:] - v21 = values21[:] - raw_data11 = [] - raw_data21 = [] - - for freq in frequencies: - real11, imag11 = v11.pop(0) - real21, imag21 = v21.pop(0) - raw_data11.append(Datapoint(freq, real11, imag11)) - raw_data21.append(Datapoint(freq, real21, imag21)) + + raw_data11 = [Datapoint(freq, values11[i][0], values11[i][1]) + for i, freq in enumerate(frequencies)] + raw_data21 = [Datapoint(freq, values21[i][0], values21[i][1]) + for i, freq in enumerate(frequencies)] data11, data21 = self.applyCalibration(raw_data11, raw_data21) logger.debug("update Freqs: %s, Offset: %s", len(frequencies), offset) diff --git a/NanoVNASaver/Windows/CalibrationSettings.py b/NanoVNASaver/Windows/CalibrationSettings.py index fb172508..a8cacc3b 100644 --- a/NanoVNASaver/Windows/CalibrationSettings.py +++ b/NanoVNASaver/Windows/CalibrationSettings.py @@ -70,7 +70,8 @@ def __init__(self, app: QtWidgets.QWidget): calibration_control_group) cal_btn = {} self.cal_label = {} - for label_name in Calibration.CAL_NAMES: + for label_name in ("short", "open", "load", + "through", "thrurefl", "isolation"): self.cal_label[label_name] = QtWidgets.QLabel("Uncalibrated") cal_btn[label_name] = QtWidgets.QPushButton( label_name.capitalize()) @@ -497,77 +498,60 @@ def setOffsetDelay(self, value: float): self.app.worker.signals.updated.emit() def calculate(self): - def _warn_ideal(cal_type: str) -> str: - return ( - 'Invalid data for "{cal_type}" calibration standard.' - ' Using ideal values.') - + cal_element = self.app.calibration.cal_element if self.app.sweep_control.btn_stop.isEnabled(): - # Currently sweeping self.app.showError( "Unable to apply calibration while a sweep is running." " Please stop the sweep and try again.") return - if self.use_ideal_values.isChecked(): - self.app.calibration.useIdealShort = True - self.app.calibration.useIdealOpen = True - self.app.calibration.useIdealLoad = True - self.app.calibration.useIdealThrough = True - else: + + cal_element.short_is_ideal = True + cal_element.open_is_ideal = True + cal_element.load_is_ideal = True + cal_element.throuh_is_ideal = True + + # TODO: all ideal or not? + if not self.use_ideal_values.isChecked(): + cal_element.short_is_ideal = False + cal_element.open_is_ideal = False + cal_element.load_is_ideal = False + cal_element.throuh_is_ideal = False + # We are using custom calibration standards - try: - self.app.calibration.shortL0 = self.getFloatValue( - self.short_l0_input.text()) / 10 ** 12 - self.app.calibration.shortL1 = self.getFloatValue( - self.short_l1_input.text()) / 10 ** 24 - self.app.calibration.shortL2 = self.getFloatValue( - self.short_l2_input.text()) / 10 ** 33 - self.app.calibration.shortL3 = self.getFloatValue( - self.short_l3_input.text()) / 10 ** 42 - self.app.calibration.shortLength = self.getFloatValue( - self.short_length.text()) / 10 ** 12 - self.app.calibration.useIdealShort = False - except ValueError: - self.app.calibration.useIdealShort = True - logger.warning(_warn_ideal("short")) - - try: - self.app.calibration.openC0 = self.getFloatValue( - self.open_c0_input.text()) / 10 ** 15 - self.app.calibration.openC1 = self.getFloatValue( - self.open_c1_input.text()) / 10 ** 27 - self.app.calibration.openC2 = self.getFloatValue( - self.open_c2_input.text()) / 10 ** 36 - self.app.calibration.openC3 = self.getFloatValue( - self.open_c3_input.text()) / 10 ** 45 - self.app.calibration.openLength = self.getFloatValue( - self.open_length.text()) / 10 ** 12 - self.app.calibration.useIdealOpen = False - except ValueError: - self.app.calibration.useIdealOpen = True - logger.warning(_warn_ideal("open")) - - try: - self.app.calibration.loadR = self.getFloatValue( - self.load_resistance.text()) - self.app.calibration.loadL = self.getFloatValue( - self.load_inductance.text()) / 10 ** 12 - self.app.calibration.loadC = self.getFloatValue( - self.load_capacitance.text()) / 10 ** 15 - self.app.calibration.loadLength = self.getFloatValue( - self.load_length.text()) / 10 ** 12 - self.app.calibration.useIdealLoad = False - except ValueError: - self.app.calibration.useIdealLoad = True - logger.warning(_warn_ideal("load")) - - try: - self.app.calibration.throughLength = self.getFloatValue( - self.through_length.text()) / 10 ** 12 - self.app.calibration.useIdealThrough = False - except ValueError: - self.app.calibration.useIdealThrough = True - logger.warning(_warn_ideal("through")) + + cal_element.short_l0 = self.getFloatValue( + self.short_l0_input.text()) / 1.0e12 + cal_element.short_l1 = self.getFloatValue( + self.short_l1_input.text()) / 1.0e24 + cal_element.short_l2 = self.getFloatValue( + self.short_l2_input.text()) / 1.0e33 + cal_element.short_l3 = self.getFloatValue( + self.short_l3_input.text()) / 1.0e42 + cal_element.short_length = self.getFloatValue( + self.short_length.text()) / 1.0e12 + + cal_element.open_c0 = self.getFloatValue( + self.open_c0_input.text()) / 1.e15 + cal_element.open_c1 = self.getFloatValue( + self.open_c1_input.text()) / 1.e27 + cal_element.open_c2 = self.getFloatValue( + self.open_c2_input.text()) / 1.0e36 + cal_element.open_c3 = self.getFloatValue( + self.open_c3_input.text()) / 1.0e45 + cal_element.openLength = self.getFloatValue( + self.open_length.text()) / 1.0e12 + + cal_element.load_r = self.getFloatValue( + self.load_resistance.text()) + cal_element.load_l = self.getFloatValue( + self.load_inductance.text()) / 1.0e12 + cal_element.load_c = self.getFloatValue( + self.load_capacitance.text()) / 1.0e15 + cal_element.load_length = self.getFloatValue( + self.load_length.text()) / 1.0e12 + + cal_element.through_length = self.getFloatValue( + self.through_length.text()) / 1.0e12 logger.debug("Attempting calibration calculation.") try: @@ -594,6 +578,8 @@ def _warn_ideal(cal_type: str) -> str: self.app.worker.data21, self.app.sweepSource) self.app.worker.signals.updated.emit() except ValueError as e: + if logger.isEnabledFor(logging.DEBUG): + raise # showError here hides the calibration window, # so we need to pop up our own QtWidgets.QMessageBox.warning( @@ -604,7 +590,10 @@ def _warn_ideal(cal_type: str) -> str: @staticmethod def getFloatValue(text: str) -> float: - return float(text) if text else 0.0 + try: + return float(text) + except (TypeError, ValueError): + return 0.0 def loadCalibration(self): filename, _ = QtWidgets.QFileDialog.getOpenFileName( diff --git a/NanoVNASaver/Windows/DisplaySettings.py b/NanoVNASaver/Windows/DisplaySettings.py index f1c6526b..b41c22e6 100644 --- a/NanoVNASaver/Windows/DisplaySettings.py +++ b/NanoVNASaver/Windows/DisplaySettings.py @@ -26,7 +26,7 @@ Chart, ChartColors) from NanoVNASaver.Windows.Bands import BandsWindow from NanoVNASaver.Windows.MarkerSettings import MarkerSettingsWindow -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Windows/MarkerSettings.py b/NanoVNASaver/Windows/MarkerSettings.py index 4e949a84..ecdb8eae 100644 --- a/NanoVNASaver/Windows/MarkerSettings.py +++ b/NanoVNASaver/Windows/MarkerSettings.py @@ -21,7 +21,7 @@ from PyQt5 import QtWidgets, QtCore, QtGui from NanoVNASaver.RFTools import Datapoint -from NanoVNASaver.Marker import Marker +from NanoVNASaver.Marker.Widget import Marker from NanoVNASaver.Marker.Values import TYPES, default_label_ids logger = logging.getLogger(__name__) diff --git a/NanoVNASaver/Windows/TDR.py b/NanoVNASaver/Windows/TDR.py index 4ed943b5..b16de89b 100644 --- a/NanoVNASaver/Windows/TDR.py +++ b/NanoVNASaver/Windows/TDR.py @@ -20,7 +20,7 @@ import math import numpy as np -import scipy +from scipy.signal import convolve from scipy.constants import speed_of_light from PyQt5 import QtWidgets, QtCore @@ -137,7 +137,7 @@ def updateTDR(self): windowed_s11 = window * s11 self.td = np.abs(np.fft.ifft(windowed_s11, FFT_POINTS)) step = np.ones(FFT_POINTS) - step_response = scipy.signal.convolve(self.td, step) + step_response = convolve(self.td, step) self.step_response_Z = 50 * ( 1 + step_response) / (1 - step_response)