diff --git a/unified_frameworks/sensor_array/fake_lidar.py b/unified_frameworks/sensor_array/fake_lidar.py index 844e022..f69becf 100644 --- a/unified_frameworks/sensor_array/fake_lidar.py +++ b/unified_frameworks/sensor_array/fake_lidar.py @@ -5,33 +5,51 @@ """ import numpy as np import time -try: - from sensor_array.LidarClass import Lidar -except ModuleNotFoundError: - import sys - import re - sys.path.append((next(re.finditer(".*unified_frameworks", __file__)).group())) - from sensor_array.LidarClass import Lidar +import sys +import re +sys.path.append((next(re.finditer(".*unified_frameworks", __file__)).group())) +from sensor_array.LidarClass import Lidar +from unified_utils import Service, polar_sum config = { "verbose":False } class FakeLidar(Lidar): - def __init__(self, empty_scans=False) -> None: - self.n = 30 - self.scan = np.random.rand(self.n, 3)*1_000 +2_000 + def __init__(self, points=100, angular_rate=1, translational_rate=1, jitter=0, noise=0, empty_scans=False) -> None: + self.n = points + angles = np.linspace(0, 360, self.n) + distances = [5_000] + while len(distances) < self.n: + shift = 0.5 # if np.random.rand() < 1 else 1 + distances.append(distances[-1]+(np.random.randn()*shift*1_000)) + quality = [15]*self.n + self.scan = np.stack([quality, angles, distances], 1) + # self.scan = np.random.rand(self.n, 3)*1_000 +2_000 self.empty_scans = empty_scans + self.noise = noise + def update(is_alive): + while is_alive(): + self.scan[:, 1]+=(0.7+np.random.randn()*jitter)*angular_rate + drift = (np.pi/2, 10*translational_rate*(np.e**(jitter*np.random.randn()))) + self.scan[:, 1] = np.deg2rad(self.scan[:, 1]) + self.scan[:, [1,2]] = [polar_sum(i, drift) for i in self.scan[:, [1,2]]] + self.scan[:, 1] = np.rad2deg(self.scan[:, 1]) + time.sleep(1/10) + self.s = Service(update, "Fake Lidar Generator") pass def connect(self, max_attempts=3, wait_seconds=1, verbose_attempts=True) -> bool: + self.s.start_service() return True def get_measures(self): if self.empty_scans: return [] - self.scan += (0,1,0) - res = self.scan + (np.random.rand(self.n, 3)-0.5)*(0,0,400) + # self.scan += (0,1,0) + res = self.scan + (np.random.rand(self.n, 3)-0.5)*(0,0,400)*self.noise print(res) if config["verbose"] else None return res + # return self.scan def disconnect(self): + self.s.stop_service() pass if __name__=='__main__': diff --git a/unified_frameworks/sensor_array/server_lidar.py b/unified_frameworks/sensor_array/server_lidar.py index 83aa927..ff5d639 100755 --- a/unified_frameworks/sensor_array/server_lidar.py +++ b/unified_frameworks/sensor_array/server_lidar.py @@ -39,7 +39,6 @@ def default(self, obj): else: print(f"[SERVER] Device port {port} found!") lidar = ActualLidar() -lidar.connect() clients = [] # @note List of clients connected in the server buffer = None @@ -96,6 +95,7 @@ async def startServer(): if __name__ == "__main__": try: print("[SERVER] Server ON") + lidar.connect() asyncio.run(startServer()) except KeyboardInterrupt: print("[SERVER] Keyboard Interrupt occurred!")