diff --git a/src/cariboulite_print.py b/src/cariboulite_print.py
deleted file mode 100644
index d4392ab..0000000
--- a/src/cariboulite_print.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#!/usr/bin/env python3
-#
-# This file is part of the sdrterm distribution
-# (https://github.com/peads/sdrterm).
-# Copyright (c) 2023-2024 Patrick Eads.
-# With code from the cariboulite distribution
-# (https://github.com/cariboulabs/cariboulite)
-# Copyright (c) 2024 CaribouLabs.co
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, version 3.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-#
-from ctypes import cdll, c_byte, c_int, create_string_buffer, Structure, pointer, c_bool, c_float, \
- c_size_t
-from typing import Annotated
-import typer
-
-
-def main(force_program: Annotated[bool, typer.Option(help='Force re-programming of FPGA')] = False,
- verbose: Annotated[
- int, typer.Option('--verbose', '-v', help='Verbosity setting for libcaribou')] = 2):
- libcar = cdll.LoadLibrary("libcariboulite.so")
-
- hw_ver = c_byte(-1)
- hw_name = create_string_buffer(128)
- hw_uuid = create_string_buffer(128)
-
- detected = libcar.cariboulite_detect_connected_board(pointer(hw_ver), hw_name, hw_uuid)
-
- print(
- f'Detection: {detected}, HWVer: {hw_ver.value}, HWName: {hw_name.value}, UUID: {hw_uuid.value}')
-
- # typedef struct
- # {
- # int major_version;
- # int minor_version;
- # int revision;
- # } cariboulite_lib_version_st;
-
- class cariboulite_lib_version_st(Structure):
- pass
-
- cariboulite_lib_version_st._fields_ = [('major_version', c_int), ('minor_version', c_int),
- ('revision', c_int)]
- version = cariboulite_lib_version_st()
-
- libcar.cariboulite_get_lib_version(pointer(version))
-
- print('Version: %02d.%02d.%02d' % (
- version.major_version, version.minor_version, version.revision))
-
- forceProgram = c_bool(force_program)
- verbosity = c_byte(verbose)
- libcar.cariboulite_init(forceProgram, verbosity)
- serial_number = libcar.cariboulite_get_sn()
-
- print('Serial Number: %08X' % (serial_number))
-
- ch_name = create_string_buffer(64)
- low_freq_vec = (c_float * 3)()
- high_freq_vec = (c_float * 3)()
-
- for ch in range(1):
- libcar.cariboulite_get_channel_name(c_int(ch), ch_name, c_size_t(len(ch_name)))
- ch_num_ranges = libcar.cariboulite_get_num_frequency_ranges(ch)
- print(f'Channel: %d, Name: %s, Num. Freq. Ranges: %d' % (ch, ch_name.value, ch_num_ranges))
- libcar.cariboulite_get_frequency_limits(ch, low_freq_vec, high_freq_vec, None)
- for i in range(ch_num_ranges):
- print(f'\tRange %d: [%.2f, %.2f]' % (i, low_freq_vec[i], high_freq_vec[i]))
-
- libcar.cariboulite_close()
-
-
-if __name__ == "__main__":
- typer.run(main)
diff --git a/src/dsp/dsp_processor.py b/src/dsp/dsp_processor.py
index 0b57f40..3910a0d 100644
--- a/src/dsp/dsp_processor.py
+++ b/src/dsp/dsp_processor.py
@@ -17,13 +17,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
+import itertools
import json
import os
-import signal as s
import struct
import sys
from functools import partial
-from multiprocessing import Pool
+from multiprocessing import Pool, Value, Pipe
+from typing import Callable
import numpy as np
from scipy import signal
@@ -32,8 +33,8 @@
from dsp.demodulation import amDemod, fmDemod, realOutput
from dsp.iq_correction import IQCorrection
from dsp.util import applyFilters, generateBroadcastOutputFilter, generateFmOutputFilters, shiftFreq
-from misc.general_util import applyIgnoreException, deinterleave, eprint, printException, tprint, \
- vprint
+from misc.general_util import applyIgnoreException, deinterleave, printException, tprint, \
+ vprint, initializer
def handleException(isDead, e):
@@ -49,21 +50,21 @@ def __init__(self,
fs: int,
centerFreq: float,
omegaOut: int,
- demod: str = None,
- tunedFreq: int = None,
- vfos: str = None,
- correctIq: bool = False,
- decimation: int = 1,
+ tunedFreq: int,
+ vfos: str,
+ correctIq: bool,
+ decimation: int,
+ demod: Callable[[np.ndarray[any, np.complex_]], np.ndarray] = realOutput,
**_):
- decimation = decimation if decimation is not None else 2
+ # decimation = decimation if decimation is not None else 2
self.outputFilters = []
self.sosIn = None
self.__decimatedFs = self.__fs = fs
self.__decimationFactor = decimation
self.__decimatedFs //= decimation
self.centerFreq = centerFreq
- self.demod = demod if demod is not None else realOutput
+ self.demod = demod
self.bandwidth = None
self.tunedFreq = tunedFreq
self.vfos = vfos
@@ -142,7 +143,7 @@ def selectOuputAm(self):
generateBroadcastOutputFilter(self.__decimatedFs, self._FILTER_DEGREE)]
self.setDemod(amDemod)
- def processChunk(self, y):
+ def processChunk(self, y: list) -> np.ndarray[any, np.real] | None:
try:
y = deinterleave(y)
@@ -158,37 +159,34 @@ def processChunk(self, y):
y = self.demod(y)
y = applyFilters(y, self.outputFilters)
- return signal.savgol_filter(y, 14, self._FILTER_DEGREE)
+ return y
except KeyboardInterrupt:
- pass
- except Exception as e:
- eprint(f'Chunk processing encountered: {e}')
- printException(e)
- return None
+ return None
- def processData(self, isDead, pipe, f) -> None:
+ def processData(self, isDead: Value, pipe: Pipe, f) -> None:
reader, writer = pipe
- if 'posix' in os.name:
- s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013
+ data = []
try:
with open(f, 'wb') if f is not None else open(sys.stdout.fileno(), 'wb',
closefd=False) as file:
tprint(f'{f} {file}')
- with Pool(maxtasksperchild=128) as pool:
- data = []
+ with Pool(initializer=initializer, initargs=(isDead,)) as pool:
+ ii = range(os.cpu_count())
while not isDead.value:
writer.close()
- for _ in range(self.__decimationFactor):
+ for _ in ii:
y = reader.recv()
- if y is None or len(y) < 1:
+ if y is None or not len(y):
break
data.append(y)
- if data is None or len(data) < 1:
+ if data is None or not len(data):
break
- y = np.array(pool.map_async(self.processChunk, data).get()).flatten()
+ y = pool.map_async(self.processChunk, data)
+ y = list(itertools.chain.from_iterable(y.get()))
+ y = signal.savgol_filter(y, 14, self._FILTER_DEGREE)
file.write(struct.pack(len(y) * 'd', *y))
data.clear()
except (EOFError, KeyboardInterrupt, BrokenPipeError):
diff --git a/src/dsp/vfo_processor.py b/src/dsp/vfo_processor.py
index 08d53ea..3504a1d 100644
--- a/src/dsp/vfo_processor.py
+++ b/src/dsp/vfo_processor.py
@@ -1,69 +1,100 @@
-import os
-import signal as s
-import struct
+import socket
+from contextlib import closing
from functools import partial
-from multiprocessing import Pool
+from multiprocessing import Pool, Pipe, Value
+from threading import Barrier
+import numpy as np
from scipy import signal
from dsp.dsp_processor import DspProcessor
from dsp.util import applyFilters, shiftFreq
-from misc.general_util import applyIgnoreException, deinterleave, eprint, printException
+from misc.general_util import deinterleave, eprint, printException, initializer, closeSocket
+from sdr.output_server import OutputServer, Receiver
-class VfoProcessor(DspProcessor):
+class PipeReceiver(Receiver):
+ def __init__(self, pipe: Pipe, vfos: int):
+ receiver, self.__writer = pipe
+ super().__init__(receiver, Barrier(vfos + 1)) # +1 for receiver thread
+
+ def __exit__(self, *ex):
+ self.__writer.close()
+ self._receiver.close()
+
+ def receive(self):
+ if not self._barrier.broken:
+ self._barrier.wait()
+ self._barrier.abort()
+ return self.receiver.recv()
- def handleOutput(self, file, freq, y):
- y = shiftFreq(y, freq, self.decimatedFs)
- y = signal.sosfilt(self.sosIn, y)
- y = self.demod(y)
- y = applyFilters(y, self.outputFilters)
- return os.write(file, struct.pack(len(y) * 'd', *y))
- def processData(self, isDead, pipe, f) -> None:
- if 'posix' in os.name:
- s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013
- if f is None or (isinstance(f, str)) and len(f) < 1 \
- or self.demod is None:
- raise ValueError('f is not defined')
- reader, writer = pipe
- n = len(self.vfos)
+class VfoProcessor(DspProcessor):
+
+ def __init__(self,
+ fs: int,
+ centerFreq: float,
+ omegaOut: int,
+ tunedFreq: int,
+ vfos: str,
+ correctIq: bool,
+ decimation: int,
+ **kwargs):
+ if not vfos or len(vfos) < 1:
+ raise ValueError("simo mode cannot be used without the vfos option")
+ super().__init__(fs=fs,
+ centerFreq=centerFreq,
+ omegaOut=omegaOut,
+ tunedFreq=tunedFreq,
+ vfos=vfos,
+ correctIq=correctIq,
+ decimation=decimation, **kwargs)
- namedPipes = []
- for i in range(n):
- name = "/tmp/pipe-" + str(i)
- os.mkfifo(name)
- namedPipes.append((name, open(name, 'wb', os.O_WRONLY | os.O_NONBLOCK)))
+ def processVfoChunk(self, y, freq) -> np.ndarray[any, np.real] | None:
+ try:
+ y = shiftFreq(y, freq, self.decimatedFs)
+ y = signal.sosfilt(self.sosIn, y)
+ y = self.demod(y)
+ return applyFilters(y, self.outputFilters)
+ except KeyboardInterrupt:
+ return None
+ def processData(self, isDead: Value, pipe: Pipe, _) -> None:
+ inReader, inWriter = pipe
+ outReader, outWriter = outPipe = Pipe(False)
try:
- with Pool(maxtasksperchild=128) as pool:
- results = []
- while not isDead.value:
- writer.close()
- y = reader.recv()
- if y is None or len(y) < 1:
- break
- y = deinterleave(y)
- if self.correctIq is not None:
- y = self.correctIq.correctIq(y)
- y = shiftFreq(y, self.centerFreq, self.fs)
- y = signal.decimate(y, self.decimation, ftype='fir')
- [r.get() for r in results] # wait for any prior processing to complete
- results = [pool.apply_async(self.handleOutput, (file.fileno(), freq, y),
- error_callback=eprint) for
- (name, file), freq in zip(namedPipes, self.vfos)]
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listenerSckt:
+ with PipeReceiver(outPipe, len(self.vfos)) as recvSckt:
+ with Pool(initializer=initializer, initargs=(isDead,)) as pool:
+
+ server = OutputServer(host='0.0.0.0')
+ lt, ft = server.initServer(recvSckt, listenerSckt, isDead)
+ ft.start()
+ lt.start()
+
+ eprint(f'\nAccepting connections on port {server.port}\n')
+ while not isDead.value:
+ inWriter.close()
+ y = inReader.recv()
+ if y is None or len(y) < 1:
+ break
+
+ y = deinterleave(y)
+ if self.correctIq is not None:
+ y = self.correctIq.correctIq(y)
+ y = shiftFreq(y, self.centerFreq, self.fs)
+
+ y = signal.decimate(y, self.decimation, ftype='fir')
+ results = pool.map_async(partial(self.processVfoChunk, y), self.vfos)
+ [outWriter.send(r) for r in results.get()]
except (EOFError, KeyboardInterrupt, BrokenPipeError):
pass
except Exception as e:
printException(e)
finally:
isDead.value = 1
- pool.close()
- pool.join()
- del pool
- for n, fd in namedPipes:
- applyIgnoreException(partial(os.write, fd.fileno(), b''))
- applyIgnoreException(partial(os.close, fd.fileno()))
- os.unlink(n)
- reader.close()
- print(f'File writer halted')
+ inReader.close()
+ outReader.close()
+ outWriter.close()
+ closeSocket(listenerSckt)
+ print(f'Multi-VFO writer halted')
diff --git a/src/misc/general_util.py b/src/misc/general_util.py
index d87b9be..73a183e 100644
--- a/src/misc/general_util.py
+++ b/src/misc/general_util.py
@@ -17,8 +17,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
+import signal as s
+import socket
import sys
import traceback
+from multiprocessing import Condition
from numbers import Number
from typing import Callable
@@ -80,3 +83,34 @@ def traceOn():
def poolErrorCallback(value):
eprint(value)
+
+
+def initializer(isDead: Condition):
+ def handleSignal(_, __):
+ isDead.value = 1
+ # raise KeyboardInterrupt
+ s.signal(s.SIGINT, handleSignal)
+
+
+def prevent_out_of_context_execution(method):
+ def decorator(self, *args, **kwargs):
+ if not self._inside_context:
+ raise AttributeError(f"{method.__name__} may only be invoked from inside context.")
+ return method(self, *args, **kwargs)
+
+ return decorator
+
+
+def remove_context(method):
+ def decorator(self, *args, **kwargs):
+ self._inside_context = False
+ self._barrier.abort()
+ return method(self, *args, **kwargs)
+
+ return decorator
+
+
+def closeSocket(clientSckt):
+ applyIgnoreException(lambda: clientSckt.send(b''))
+ applyIgnoreException(lambda: clientSckt.shutdown(socket.SHUT_RDWR))
+ applyIgnoreException(clientSckt.close)
diff --git a/src/misc/io_args.py b/src/misc/io_args.py
index 91de882..1665206 100644
--- a/src/misc/io_args.py
+++ b/src/misc/io_args.py
@@ -90,12 +90,7 @@ def initParameters(cls):
@classmethod
def initIOHandlers(cls):
- if not cls.simo:
- processor = DspProcessor
- else:
- if not (cls.vfos and hasattr(os, 'mkfifo')):
- raise ValueError("simo mode cannot be used without the vfos option")
- processor = VfoProcessor
+ processor = DspProcessor if not cls.simo else VfoProcessor
cls.processor = processor = processor(decimation=cls.dec,
centerFreq=cls.center,
tunedFreq=cls.tuned,
diff --git a/src/plots/multi_vfo_plot.py b/src/plots/multi_vfo_plot.py
index 42bcc9c..19071fe 100644
--- a/src/plots/multi_vfo_plot.py
+++ b/src/plots/multi_vfo_plot.py
@@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
-import os
-import signal as s
from functools import partial
from itertools import chain
from multiprocessing import Pool
@@ -29,6 +27,7 @@
from scipy import fft
from dsp.util import shiftFreq
+from misc.general_util import initializer
from plots.abstract_plot import Plot
@@ -39,7 +38,7 @@ def __init__(self, **kwargs):
if self.vfos is None:
raise ValueError('vfos not specified')
self.axes = None
- self.pool = None
+ self._pool = None
def initPlot(self):
super().initPlot()
@@ -88,23 +87,17 @@ def initPlot(self):
@staticmethod
def shiftVfos(y, fs, freq):
- return shiftFreq(y, freq, fs)
+ try:
+ return shiftFreq(y, freq, fs)
+ except KeyboardInterrupt:
+ return None
- def close(self):
- super().close()
- if hasattr(self, 'pool'):
- self.pool.close()
- self.pool.join()
- del self.pool
+ def processData(self, isDead, pipe, ex=None) -> None:
+ with Pool(initializer=initializer, initargs=(isDead,)) as self._pool:
+ super().processData(isDead, pipe, ex)
def animate(self, y):
-
- if not self.isInit:
- if 'posix' in os.name:
- s.signal(s.SIGINT, s.SIG_IGN) # https://stackoverflow.com/a/68695455/8372013
- self.pool = Pool(maxtasksperchild=128)
-
- shift = self.pool.map_async(partial(self.shiftVfos, y, self.fs), self.vfos)
+ shift = self._pool.map_async(partial(self.shiftVfos, y, self.fs), self.vfos)
fftData = fft.fft(shift.get(), norm='forward')
fftData = fft.fftshift(fftData)
amps = np.abs(fftData)
diff --git a/src/rtltcp.py b/src/rtltcp.py
index f58c02c..079d63f 100644
--- a/src/rtltcp.py
+++ b/src/rtltcp.py
@@ -21,35 +21,59 @@
import socket
from contextlib import closing
from multiprocessing import Value
-from threading import Condition
from typing import Annotated
import typer
-from sdr.control_rtl_tcp import UnrecognizedInputError
-from misc.general_util import applyIgnoreException, printException
+from misc.general_util import printException, eprint, closeSocket
from sdr.control_rtl_tcp import ControlRtlTcp
-from sdr.output_server import OutputServer
+from sdr.control_rtl_tcp import UnrecognizedInputError
+from sdr.output_server import OutputServer, Receiver
from sdr.rtl_tcp_commands import RtlTcpCommands
+class __SocketReceiver(Receiver):
+
+ def __init__(self, writeSize=262144, readSize=8192):
+ super().__init__(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
+ self.readSize = readSize
+ self.data = bytearray()
+ self.chunks = range(writeSize // readSize)
+
+ def __exit__(self, *ex):
+ closeSocket(self._receiver)
+
+ def receive(self):
+ if not self._barrier.broken:
+ self._barrier.wait()
+ self._barrier.abort()
+ for _ in self.chunks:
+ try:
+ inp = self.receiver.recv(self.readSize)
+ except BrokenPipeError:
+ return b''
+ if inp is None or not len(inp):
+ break
+ self.data.extend(inp)
+ result = bytes(self.data)
+ self.data.clear()
+ return result
+
+
def main(host: Annotated[str, typer.Argument(help='Address of remote rtl_tcp server')],
port: Annotated[int, typer.Argument(help='Port of remote rtl_tcp server')]) -> None:
- with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as signalSckt:
- with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as outputSckt:
- signalSckt.connect((host, port))
- cmdr = ControlRtlTcp(signalSckt)
+ with __SocketReceiver() as recvSckt:
+ with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listenerSckt:
+ recvSckt.receiver.settimeout(1)
+ recvSckt.receiver.connect((host, port))
+ cmdr = ControlRtlTcp(recvSckt.receiver)
isDead = Value('b', 0)
isDead.value = 0
- isConnected = Condition()
server = OutputServer(host='0.0.0.0')
- with isConnected:
- st, pt = server.initServer(signalSckt, outputSckt, isConnected, isDead)
- st.start()
- isConnected.wait()
- pt.start()
- del isConnected
+ st, pt = server.initServer(recvSckt, listenerSckt, isDead)
+ pt.start()
+ st.start()
try:
while not isDead.value:
@@ -77,12 +101,13 @@ def main(host: Annotated[str, typer.Argument(help='Address of remote rtl_tcp ser
print(f'ERROR: Input invalid: {cmd}: {param}. Please try again')
except (UnrecognizedInputError, ValueError, KeyError) as ex:
print(f'ERROR: Input invalid: {ex}. Please try again')
+ except (ConnectionResetError, ConnectionAbortedError):
+ eprint(f'Connection lost')
except Exception as e:
printException(e)
finally:
isDead.value = 1
- applyIgnoreException(lambda: signalSckt.shutdown(socket.SHUT_RDWR))
- applyIgnoreException(lambda: outputSckt.shutdown(socket.SHUT_RDWR))
+ closeSocket(listenerSckt)
st.join(0.1)
pt.join(0.1)
print('UI halted')
diff --git a/src/sdr/control_rtl_tcp.py b/src/sdr/control_rtl_tcp.py
index 4acf427..5e5b6c8 100644
--- a/src/sdr/control_rtl_tcp.py
+++ b/src/sdr/control_rtl_tcp.py
@@ -18,6 +18,7 @@
# along with this program. If not, see .
#
from struct import error as StructError, pack
+
from sdr.rtl_tcp_commands import RtlTcpCommands
diff --git a/src/sdr/output_server.py b/src/sdr/output_server.py
index 70815aa..c89e846 100644
--- a/src/sdr/output_server.py
+++ b/src/sdr/output_server.py
@@ -20,93 +20,131 @@
import multiprocessing
import os
import socket
-from multiprocessing import Value
+from abc import ABC, abstractmethod
+from concurrent.futures import ThreadPoolExecutor
+from multiprocessing import Value, Queue
from queue import Empty
-from threading import Condition, Thread
+from threading import Thread, Barrier, BrokenBarrierError
-from misc.general_util import applyIgnoreException, printException
+from misc.general_util import printException, eprint, prevent_out_of_context_execution, \
+ remove_context, closeSocket
from sdr.util import findPort
+class Receiver(ABC):
+
+ def __init__(self, receiver, barrier=Barrier(2)):
+ self._receiver = receiver
+ self._barrier = barrier
+ self._inside_context = False
+
+ def __enter__(self):
+ self._inside_context = True
+ return self
+
+ @remove_context
+ @abstractmethod
+ def __exit__(self, *exc):
+ pass
+
+ @property
+ @prevent_out_of_context_execution
+ def barrier(self):
+ return self._barrier
+
+ @barrier.setter
+ @prevent_out_of_context_execution
+ def barrier(self, _):
+ raise NotImplemented("Receiver does not allow setting barrier")
+
+ @barrier.deleter
+ @prevent_out_of_context_execution
+ def barrier(self):
+ del self._barrier
+
+ @property
+ @prevent_out_of_context_execution
+ def receiver(self):
+ return self._receiver
+
+ @receiver.setter
+ @prevent_out_of_context_execution
+ def receiver(self, _):
+ raise NotImplemented("Receiver does not allow setting barrier")
+
+ @receiver.deleter
+ @prevent_out_of_context_execution
+ def receiver(self):
+ del self._receiver
+
+ @abstractmethod
+ @prevent_out_of_context_execution
+ def receive(self):
+ pass
+
+
class OutputServer:
- def __init__(self,
- host='localhost',
- port=findPort(),
- writeSize=262144):
+ def __init__(self, host='localhost', port=findPort()):
self.host = host
self.port = port
- self.clients: multiprocessing.Queue = multiprocessing.Queue(maxsize=os.cpu_count())
- self.writeSize = writeSize
+ self.clients: Queue = multiprocessing.Queue(maxsize=os.cpu_count())
def close(self, exitFlag: Value) -> None:
- if exitFlag.value:
+ exitFlag.value = 1
+ while 1:
try:
- exitFlag.value = 1
-
- for i in range(self.clients.qsize()):
- try:
- clientSckt = self.clients.get_nowait()
- clientSckt.send(b'')
- clientSckt.shutdown(socket.SHUT_RDWR)
- clientSckt.close()
- except Empty:
- break
- finally:
- self.clients.close()
- del self.clients
+ clientSckt = self.clients.get_nowait()
+ closeSocket(clientSckt)
+ except FileNotFoundError:
+ pass
+ except (Empty, ValueError, OSError, TypeError):
+ break
except Exception as e:
printException(e)
+ break
+ self.clients.close()
+ self.clients.join_thread()
- def feedClients(self, recvSckt: socket.socket, exitFlag: Value) -> None:
+ def feed(self, recvSckt: Receiver, exitFlag: Value) -> None:
processingList = []
- ii = range(self.writeSize >> 13)
try:
- data = bytearray()
while not exitFlag.value:
- for _ in ii:
- inp = recvSckt.recv(8192)
- if inp is None or not len(inp):
- break
- data.extend(inp)
try:
while not exitFlag.value:
clientSckt = self.clients.get_nowait()
try:
- clientSckt.sendall(data)
+ clientSckt.sendall(recvSckt.receive())
processingList.append(clientSckt)
except (ConnectionAbortedError, BlockingIOError, ConnectionResetError,
- ConnectionAbortedError, EOFError, BrokenPipeError) as e:
- applyIgnoreException(lambda: clientSckt.shutdown(socket.SHUT_RDWR))
- clientSckt.close()
- print(f'Client disconnected {e}')
+ ConnectionAbortedError, EOFError, BrokenPipeError, BrokenBarrierError) as e:
+ closeSocket(clientSckt)
+ eprint(f'Client disconnected {e}')
except Empty:
pass
for c in processingList:
self.clients.put(c)
-
processingList.clear()
- data.clear()
- except (EOFError, ConnectionResetError, ConnectionAbortedError):
+ except (ValueError, OSError, EOFError, ConnectionResetError, ConnectionAbortedError):
pass
except Exception as e:
printException(e)
finally:
self.close(exitFlag)
- print('Consumer halted')
+ print('Feeder halted')
- def listen(self, serverSckt: socket.socket, isConnected: Condition, exitFlag: Value) -> None:
- with isConnected:
- serverSckt.bind((self.host, self.port))
- serverSckt.listen(1)
- isConnected.notify()
+ def listen(self, listenerSckt: socket.socket, isConnected: Barrier, exitFlag: Value) -> None:
+ listenerSckt.bind((self.host, self.port))
+ listenerSckt.listen(1)
try:
- while not exitFlag.value:
- (clientSckt, address) = serverSckt.accept()
- # cs.setblocking(False)
- print(f'Connection request from: {address}')
- self.clients.put(clientSckt)
+ with ThreadPoolExecutor(max_workers=isConnected.parties) as pool:
+ while not exitFlag.value:
+ (clientSckt, address) = listenerSckt.accept()
+ eprint(f'Connection request from: {address}')
+ self.clients.put(clientSckt)
+ if not isConnected.broken:
+ pool.submit(isConnected.wait)
except OSError:
pass
except Exception as ex:
@@ -118,10 +156,9 @@ def listen(self, serverSckt: socket.socket, isConnected: Condition, exitFlag: Va
print('Listener halted')
def initServer(self,
- serverSckt: socket.socket,
+ recvSckt: Receiver,
listenerSckt: socket.socket,
- isConnected: Condition,
exitFlag: Value) -> (Thread, Thread):
- st = Thread(target=self.listen, args=(listenerSckt, isConnected, exitFlag))
- pt = Thread(target=self.feedClients, args=(serverSckt, exitFlag))
- return st, pt
+ listenerThread = Thread(target=self.listen, args=(listenerSckt, recvSckt.barrier, exitFlag))
+ feedThread = Thread(target=self.feed, args=(recvSckt, exitFlag))
+ return listenerThread, feedThread
diff --git a/src/sdrterm.py b/src/sdrterm.py
index 807ae4f..3342bbe 100644
--- a/src/sdrterm.py
+++ b/src/sdrterm.py
@@ -48,12 +48,12 @@ def main(fs: Annotated[int, typer.Option('--sampling-rate', '--fs', show_default
bits: Annotated[int, typer.Option('--bits-per-sample', '-b', help='Bits per sample (ignored if wav file)')] = None,
enc: Annotated[str, typer.Option('--encoding', '-e', help='Binary encoding (ignored if wav file)')] = None,
normalize: Annotated[bool, typer.Option(help='Toggle normalizing input analytic signal')] = False,
- omegaOut: Annotated[int, typer.Option('--omega-out', '-m', help='Cutoff frequency in Hz')] = 9500,
+ omegaOut: Annotated[int, typer.Option('--omega-out', '-m', help='Cutoff frequency in Hz')] = 12500,
correct_iq: Annotated[bool, typer.Option(help='Toggle iq correction')] = False,
simo: Annotated[bool, typer.Option(help='EXPERIMENTAL enable using named pipes to output data processed from multiple channels specified by the vfos option')] = False,
verbose: Annotated[bool, typer.Option('--verbose', '-v', help='Toggle verbose output')] = False,
trace: Annotated[bool, typer.Option(help='Toggle extra verbose output')] = False,
- read_size: Annotated[int, typer.Option(help='Size in bytes read per iteration')] = 8192):
+ read_size: Annotated[int, typer.Option(help='Size in bytes read per iteration')] = 65536):
processes: dict[UUID, Process] = {}
pipes: dict[UUID, Pipe] = {}
@@ -98,10 +98,11 @@ def main(fs: Annotated[int, typer.Option('--sampling-rate', '--fs', show_default
finally:
isDead.value = 1
for proc in processes.values():
- proc.join()
- proc.close()
+ proc.terminate()
print('Main halted')
if __name__ == '__main__':
+ # if 'spawn' in get_all_start_methods():
+ # set_start_method('spawn')
typer.run(main)