Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added parallel processing on output. Fixed some bugs. Added trace ver… #6

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 81 additions & 41 deletions src/dsp/dsp_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import json
import os
import signal as s
import struct
import sys
from functools import partial
from multiprocessing import Pool

import numpy as np
from scipy import signal

from dsp.data_processor import DataProcessor
from dsp.demodulation import amDemod, fmDemod, realDemod
from dsp.iq_correction import IQCorrection
from dsp.util import applyFilters, cnormalize, convertDeinterlRealToComplex, \
generateBroadcastOutputFilter, generateFmOutputFilters, shiftFreq
from misc.general_util import deinterleave, vprint, printException
from dsp.util import applyFilters, generateBroadcastOutputFilter, generateFmOutputFilters, shiftFreq
from misc.general_util import deinterleave, vprint, printException, eprint, applyIgnoreException


class DspProcessor(DataProcessor):
_FILTER_DEGREE = 4
_FILTER_DEGREE = 8

def __init__(self,
fs: int,
Expand All @@ -42,8 +46,8 @@ def __init__(self,
demod: str = None,
tunedFreq: int = None,
vfos: str = None,
normalize: bool = False,
correctIq:bool = False):
correctIq: bool = False,
**kwargs):

decimation = decimation if decimation is not None else 1
self.outputFilters = []
Expand All @@ -62,8 +66,7 @@ def __init__(self,
self.tunedFreq = tunedFreq
self.vfos = vfos
self.omegaOut = omegaOut
self.normalize = cnormalize if normalize else lambda x: x
self.correctIq = IQCorrection(self.decimatedFs).correctIq if correctIq else lambda x: x
self.correctIq = IQCorrection(self.decimatedFs) if correctIq else None

def setDecimation(self, decimation):
if decimation is not None:
Expand Down Expand Up @@ -91,17 +94,17 @@ def selectOuputFm(self):
vprint('NFM Selected')
self.bandwidth = 12500
self.outputFilters = [signal.ellip(self._FILTER_DEGREE, 1, 30, self.omegaOut,
# self.outputFilters = [signal.butter(self._FILTER_DEGREE, self.omegaOut,
btype='lowpass',
analog=False,
output='sos',
fs=self.decimatedFs >> 1)]
# self.outputFilters = [signal.butter(self._FILTER_DEGREE, self.omegaOut,
btype='lowpass',
analog=False,
output='sos',
fs=self.decimatedFs)]
self.setDemod(fmDemod)

def selectOuputWfm(self):
vprint('WFM Selected')
self.bandwidth = 15000
self.outputFilters = generateFmOutputFilters(self.decimatedFs >> 1, self._FILTER_DEGREE,
self.outputFilters = generateFmOutputFilters(self.decimatedFs, self._FILTER_DEGREE,
18000)
self.setDemod(fmDemod)

Expand All @@ -111,38 +114,75 @@ def selectOuputAm(self):
self.outputFilters = [generateBroadcastOutputFilter(self.decimatedFs, self._FILTER_DEGREE)]
self.setDemod(amDemod)

def processChunk(self, y):
try:
y = shiftFreq(y, self.centerFreq, self.fs)
y = signal.decimate(y, self.decimationFactor, ftype='fir')
y = signal.sosfilt(self.sosIn, y)
y = self.demod(y)
y = applyFilters(y, self.outputFilters)
return y
except KeyboardInterrupt:
pass
except Exception as e:
eprint(f'Chunk processing encountered: {e}')
printException(e)
return None

def handleException(self, isDead, e):
isDead.value = 1
if not isinstance(e, KeyboardInterrupt):
printException(e)

def processData(self, isDead, pipe, f) -> None:
if f is None or (isinstance(f, str)) and len(f) < 1 \
or self.demod is None:
raise ValueError('f is not defined')
reader, writer = pipe
with open(f, 'wb') as file:
try:
while not isDead.value:
writer.close()
y = reader.recv()
if y is None or len(y) < 1:
break
y = deinterleave(y)
y = convertDeinterlRealToComplex(y)
y = self.normalize(y)
y = self.correctIq(y)
y = shiftFreq(y, self.centerFreq, self.fs)
y = signal.decimate(y, self.decimationFactor, ftype='fir')
y = signal.sosfilt(self.sosIn, y)
y = self.demod(y)
y = applyFilters(y, self.outputFilters)
file.write(struct.pack(len(y) * 'd', *y))
except (EOFError, KeyboardInterrupt):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
file.write(b'')
reader.close()
print(f'File writer halted')
s.signal(s.SIGINT, s.SIG_IGN)

try:
with open(f, 'wb') if f != sys.stdout.fileno() else open(f, 'wb', closefd=False) as file:
with Pool(maxtasksperchild=128) as pool:
chunks = []
perCpu = 1 / os.cpu_count()
while not isDead.value:
writer.close()
y = reader.recv()

if y is None or len(y) < 1:
break

y = deinterleave(y)
if self.correctIq is not None:
y = self.correctIq.correctIq(y)
n = len(y)
stride = int(n * perCpu)
for i in range(0, n, stride):
chunks.append(pool.apply_async(self.processChunk,
args=(y[i:i + stride],),
error_callback=partial(self.handleException, isDead)))

for chunk in chunks:
data = chunk.get()
file.write(struct.pack(len(data) * 'd', *data))
chunks.clear()
except (EOFError, KeyboardInterrupt, BrokenPipeError):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
pool.close()
pool.join()
applyIgnoreException(partial(file.write, b''))
reader.close()
print(f'File writer halted')

def __repr__(self):
return json.dumps({key: value for key, value in self.__dict__.items()
if not key.startswith('__') and not callable(key)}, indent=2)
if not key.startswith('__')
and not callable(value)
and not isinstance(value, np.ndarray)
and not isinstance(value, IQCorrection)
and key not in {'outputFilters'}}, indent=2)
3 changes: 3 additions & 0 deletions src/dsp/iq_correction.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import numpy as np

from misc.general_util import tprint


class IQCorrection:
def __init__(self, sampRate: int, impedance: int = 50):
Expand Down Expand Up @@ -60,4 +62,5 @@ def correctIq(self, data: np.ndarray[any, np.complex_]) -> np.ndarray[any, np.co
for i in range(len(data)):
z = data[i] = data[i] - self.__off
self.__off += z * self.__inductance
tprint(self.__off)
return data
5 changes: 0 additions & 5 deletions src/dsp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
from scipy import signal


def convertDeinterlRealToComplex(y: list[Number] | np.ndarray[any, np.real]) -> \
np.ndarray[any, np.complex_]:
return np.array([re + 1j * im for re, im in zip(y[:len(y) // 2], y[len(y) // 2:])])


def shiftFreq(y: np.ndarray[any, np.complex_] | list[Complex], freq: Real, fs: Real) -> np.ndarray[
any, np.complex_]:
if not freq or not fs:
Expand Down
65 changes: 35 additions & 30 deletions src/dsp/vfo_processor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import struct
from functools import partial
from multiprocessing import Pool
import signal as s

from scipy import signal

from dsp.dsp_processor import DspProcessor
from dsp.util import applyFilters, cnormalize, convertDeinterlRealToComplex, shiftFreq
from misc.general_util import applyIgnoreException, deinterleave, printException
from dsp.util import applyFilters, shiftFreq
from misc.general_util import applyIgnoreException, deinterleave, printException, eprint


class VfoProcessor(DspProcessor):
Expand All @@ -19,11 +21,11 @@ def handleOutput(self, file, freq, y):
return os.write(file, struct.pack(len(y) * 'd', *y))

def processData(self, isDead, pipe, f) -> None:
s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013
if f is None or (isinstance(f, str)) and len(f) < 1 \
or self.demod is None:
raise ValueError('f is not defined')
reader, writer = pipe
normalize = cnormalize if self.normalize else lambda x: x
n = len(self.vfos)

namedPipes = []
Expand All @@ -32,30 +34,33 @@ def processData(self, isDead, pipe, f) -> None:
os.mkfifo(name)
namedPipes.append((name, open(name, 'wb', os.O_WRONLY | os.O_NONBLOCK)))

with Pool(processes=n) as pool:
try:
results = []
while not isDead.value:
writer.close()
y = reader.recv()
if y is None or len(y) < 1:
break
y = deinterleave(y)
y = convertDeinterlRealToComplex(y)
y = normalize(y)
y = shiftFreq(y, self.centerFreq, self.fs)
y = signal.decimate(y, self.decimationFactor, ftype='fir')
[r.get() for r in results] # wait for any prior processing to complete
results = [pool.apply_async(self.handleOutput, (file.fileno(), freq, y)) for (name, file), freq in zip(namedPipes, self.vfos)]

except (EOFError, KeyboardInterrupt, BrokenPipeError):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
for n, fd in namedPipes:
applyIgnoreException(lambda: os.close(fd.fileno()))
os.unlink(n)
reader.close()
print(f'File writer halted')
try:
with Pool(maxtasksperchild=128) as pool:
results = []
while not isDead.value:
writer.close()
y = reader.recv()
if y is None or len(y) < 1:
break
y = deinterleave(y)
if self.correctIq is not None:
y = self.correctIq.correctIq(y)
y = shiftFreq(y, self.centerFreq, self.fs)
y = signal.decimate(y, self.decimationFactor, ftype='fir')
[r.get() for r in results] # wait for any prior processing to complete
results = [pool.apply_async(self.handleOutput, (file.fileno(), freq, y), error_callback=eprint) for
(name, file), freq in zip(namedPipes, self.vfos)]
except (EOFError, KeyboardInterrupt, BrokenPipeError):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
pool.close()
pool.join()
for n, fd in namedPipes:
applyIgnoreException(partial(os.write, fd.fileno(), b''))
applyIgnoreException(partial(os.close, fd.fileno()))
os.unlink(n)
reader.close()
print(f'File writer halted')
48 changes: 45 additions & 3 deletions src/misc/general_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
#
import sys
import traceback
from numbers import Number
from typing import Callable

import numpy as np


class __VerbosePrint:
__verbose = False
__trace = False

@property
def verbose(self):
Expand All @@ -37,11 +41,28 @@ def verbose(self, value):
def verbose(self):
del self.__verbose

@property
def trace(self):
return self.__trace

@trace.setter
def trace(self, value):
self.__verbose = self.__trace = value

@trace.deleter
def trace(self):
del self.__trace

@classmethod
def vprint(cls, *args, **kwargs):
if cls.verbose:
eprint(*args, **kwargs)

@classmethod
def tprint(cls, *args, **kwargs):
if cls.trace:
eprint(*args, **kwargs)


def eprint(*args, **kwargs):
return print(*args, file=sys.stderr, **kwargs)
Expand All @@ -51,14 +72,27 @@ def vprint(*args, **kwargs):
__VerbosePrint.vprint(*args, **kwargs)


def tprint(*args, **kwargs):
__VerbosePrint.tprint(*args, **kwargs)


def interleave(x: list, y: list) -> list:
out = [x for xs in zip(x, y) for x in xs]
return out


def deinterleave(y: list) -> list:
y = [y[i::2] for i in range(2)]
return y[0] + y[1]
# def convertDeinterlRealToComplex(y: np.ndarray[any, np.real]) -> np.ndarray:
# return np.array([re + 1j * im for re, im in zip(y[:len(y) // 2], y[len(y) // 2:])])
#
#
# def deinterleave(y: list) -> list:
# y = [y[i::2] for i in range(2)]
# return y[0] + y[1]


def deinterleave(y: list[Number] | np.ndarray[any, np.number]) -> np.ndarray[any, np.complex_]:
y = [a + 1j * b for a, b in zip(y[::2], y[1::2])]
return np.array(y)


def printException(e):
Expand All @@ -75,3 +109,11 @@ def applyIgnoreException(func: Callable[[], None]):

def setVerbose(verbose: bool):
__VerbosePrint.verbose = verbose


def setTrace(trace: bool):
__VerbosePrint.trace = trace


def poolErrorCallback(value):
eprint(value)
Loading