Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output server #8

Merged
merged 15 commits into from
May 26, 2024
84 changes: 0 additions & 84 deletions src/cariboulite_print.py

This file was deleted.

52 changes: 25 additions & 27 deletions src/dsp/dsp_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import itertools
import json
import os
import signal as s
import struct
import sys
from functools import partial
from multiprocessing import Pool
from multiprocessing import Pool, Value, Pipe
from typing import Callable

import numpy as np
from scipy import signal
Expand All @@ -32,8 +33,8 @@
from dsp.demodulation import amDemod, fmDemod, realOutput
from dsp.iq_correction import IQCorrection
from dsp.util import applyFilters, generateBroadcastOutputFilter, generateFmOutputFilters, shiftFreq
from misc.general_util import applyIgnoreException, deinterleave, eprint, printException, tprint, \
vprint
from misc.general_util import applyIgnoreException, deinterleave, printException, tprint, \
vprint, initializer


def handleException(isDead, e):
Expand All @@ -49,21 +50,21 @@ def __init__(self,
fs: int,
centerFreq: float,
omegaOut: int,
demod: str = None,
tunedFreq: int = None,
vfos: str = None,
correctIq: bool = False,
decimation: int = 1,
tunedFreq: int,
vfos: str,
correctIq: bool,
decimation: int,
demod: Callable[[np.ndarray[any, np.complex_]], np.ndarray] = realOutput,
**_):

decimation = decimation if decimation is not None else 2
# decimation = decimation if decimation is not None else 2
self.outputFilters = []
self.sosIn = None
self.__decimatedFs = self.__fs = fs
self.__decimationFactor = decimation
self.__decimatedFs //= decimation
self.centerFreq = centerFreq
self.demod = demod if demod is not None else realOutput
self.demod = demod
self.bandwidth = None
self.tunedFreq = tunedFreq
self.vfos = vfos
Expand Down Expand Up @@ -142,7 +143,7 @@ def selectOuputAm(self):
generateBroadcastOutputFilter(self.__decimatedFs, self._FILTER_DEGREE)]
self.setDemod(amDemod)

def processChunk(self, y):
def processChunk(self, y: list) -> np.ndarray[any, np.real] | None:
try:
y = deinterleave(y)

Expand All @@ -158,37 +159,34 @@ def processChunk(self, y):
y = self.demod(y)
y = applyFilters(y, self.outputFilters)

return signal.savgol_filter(y, 14, self._FILTER_DEGREE)
return y
except KeyboardInterrupt:
pass
except Exception as e:
eprint(f'Chunk processing encountered: {e}')
printException(e)
return None
return None

def processData(self, isDead, pipe, f) -> None:
def processData(self, isDead: Value, pipe: Pipe, f) -> None:
reader, writer = pipe
if 'posix' in os.name:
s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013

data = []
try:
with open(f, 'wb') if f is not None else open(sys.stdout.fileno(), 'wb',
closefd=False) as file:
tprint(f'{f} {file}')
with Pool(maxtasksperchild=128) as pool:
data = []
with Pool(initializer=initializer, initargs=(isDead,)) as pool:
ii = range(os.cpu_count())
while not isDead.value:
writer.close()
for _ in range(self.__decimationFactor):
for _ in ii:
y = reader.recv()
if y is None or len(y) < 1:
if y is None or not len(y):
break
data.append(y)

if data is None or len(data) < 1:
if data is None or not len(data):
break

y = np.array(pool.map_async(self.processChunk, data).get()).flatten()
y = pool.map_async(self.processChunk, data)
y = list(itertools.chain.from_iterable(y.get()))
y = signal.savgol_filter(y, 14, self._FILTER_DEGREE)
file.write(struct.pack(len(y) * 'd', *y))
data.clear()
except (EOFError, KeyboardInterrupt, BrokenPipeError):
Expand Down
131 changes: 81 additions & 50 deletions src/dsp/vfo_processor.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,100 @@
import os
import signal as s
import struct
import socket
from contextlib import closing
from functools import partial
from multiprocessing import Pool
from multiprocessing import Pool, Pipe, Value
from threading import Barrier

import numpy as np
from scipy import signal

from dsp.dsp_processor import DspProcessor
from dsp.util import applyFilters, shiftFreq
from misc.general_util import applyIgnoreException, deinterleave, eprint, printException
from misc.general_util import deinterleave, eprint, printException, initializer, closeSocket
from sdr.output_server import OutputServer, Receiver


class VfoProcessor(DspProcessor):
class PipeReceiver(Receiver):
def __init__(self, pipe: Pipe, vfos: int):
receiver, self.__writer = pipe
super().__init__(receiver, Barrier(vfos + 1)) # +1 for receiver thread

def __exit__(self, *ex):
self.__writer.close()
self._receiver.close()

def receive(self):
if not self._barrier.broken:
self._barrier.wait()
self._barrier.abort()
return self.receiver.recv()

def handleOutput(self, file, freq, y):
y = shiftFreq(y, freq, self.decimatedFs)
y = signal.sosfilt(self.sosIn, y)
y = self.demod(y)
y = applyFilters(y, self.outputFilters)
return os.write(file, struct.pack(len(y) * 'd', *y))

def processData(self, isDead, pipe, f) -> None:
if 'posix' in os.name:
s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013
if f is None or (isinstance(f, str)) and len(f) < 1 \
or self.demod is None:
raise ValueError('f is not defined')
reader, writer = pipe
n = len(self.vfos)
class VfoProcessor(DspProcessor):

def __init__(self,
fs: int,
centerFreq: float,
omegaOut: int,
tunedFreq: int,
vfos: str,
correctIq: bool,
decimation: int,
**kwargs):
if not vfos or len(vfos) < 1:
raise ValueError("simo mode cannot be used without the vfos option")
super().__init__(fs=fs,
centerFreq=centerFreq,
omegaOut=omegaOut,
tunedFreq=tunedFreq,
vfos=vfos,
correctIq=correctIq,
decimation=decimation, **kwargs)

namedPipes = []
for i in range(n):
name = "/tmp/pipe-" + str(i)
os.mkfifo(name)
namedPipes.append((name, open(name, 'wb', os.O_WRONLY | os.O_NONBLOCK)))
def processVfoChunk(self, y, freq) -> np.ndarray[any, np.real] | None:
try:
y = shiftFreq(y, freq, self.decimatedFs)
y = signal.sosfilt(self.sosIn, y)
y = self.demod(y)
return applyFilters(y, self.outputFilters)
except KeyboardInterrupt:
return None

def processData(self, isDead: Value, pipe: Pipe, _) -> None:
inReader, inWriter = pipe
outReader, outWriter = outPipe = Pipe(False)
try:
with Pool(maxtasksperchild=128) as pool:
results = []
while not isDead.value:
writer.close()
y = reader.recv()
if y is None or len(y) < 1:
break
y = deinterleave(y)
if self.correctIq is not None:
y = self.correctIq.correctIq(y)
y = shiftFreq(y, self.centerFreq, self.fs)
y = signal.decimate(y, self.decimation, ftype='fir')
[r.get() for r in results] # wait for any prior processing to complete
results = [pool.apply_async(self.handleOutput, (file.fileno(), freq, y),
error_callback=eprint) for
(name, file), freq in zip(namedPipes, self.vfos)]
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listenerSckt:
with PipeReceiver(outPipe, len(self.vfos)) as recvSckt:
with Pool(initializer=initializer, initargs=(isDead,)) as pool:

server = OutputServer(host='0.0.0.0')
lt, ft = server.initServer(recvSckt, listenerSckt, isDead)
ft.start()
lt.start()

eprint(f'\nAccepting connections on port {server.port}\n')
while not isDead.value:
inWriter.close()
y = inReader.recv()
if y is None or len(y) < 1:
break

y = deinterleave(y)
if self.correctIq is not None:
y = self.correctIq.correctIq(y)
y = shiftFreq(y, self.centerFreq, self.fs)

y = signal.decimate(y, self.decimation, ftype='fir')
results = pool.map_async(partial(self.processVfoChunk, y), self.vfos)
[outWriter.send(r) for r in results.get()]
except (EOFError, KeyboardInterrupt, BrokenPipeError):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
pool.close()
pool.join()
del pool
for n, fd in namedPipes:
applyIgnoreException(partial(os.write, fd.fileno(), b''))
applyIgnoreException(partial(os.close, fd.fileno()))
os.unlink(n)
reader.close()
print(f'File writer halted')
inReader.close()
outReader.close()
outWriter.close()
closeSocket(listenerSckt)
print(f'Multi-VFO writer halted')
34 changes: 34 additions & 0 deletions src/misc/general_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import signal as s
import socket
import sys
import traceback
from multiprocessing import Condition
from numbers import Number
from typing import Callable

Expand Down Expand Up @@ -80,3 +83,34 @@ def traceOn():

def poolErrorCallback(value):
eprint(value)


def initializer(isDead: Condition):
def handleSignal(_, __):
isDead.value = 1
# raise KeyboardInterrupt
s.signal(s.SIGINT, handleSignal)


def prevent_out_of_context_execution(method):
def decorator(self, *args, **kwargs):
if not self._inside_context:
raise AttributeError(f"{method.__name__} may only be invoked from inside context.")
return method(self, *args, **kwargs)

return decorator


def remove_context(method):
def decorator(self, *args, **kwargs):
self._inside_context = False
self._barrier.abort()
return method(self, *args, **kwargs)

return decorator


def closeSocket(clientSckt):
applyIgnoreException(lambda: clientSckt.send(b''))
applyIgnoreException(lambda: clientSckt.shutdown(socket.SHUT_RDWR))
applyIgnoreException(clientSckt.close)
Loading