Skip to content
This repository has been archived by the owner on May 4, 2024. It is now read-only.

Updated FakeLidar #116

Merged
merged 3 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions unified_frameworks/sensor_array/fake_lidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,51 @@
"""
import numpy as np
import time
try:
from sensor_array.LidarClass import Lidar
except ModuleNotFoundError:
import sys
import re
sys.path.append((next(re.finditer(".*unified_frameworks", __file__)).group()))
from sensor_array.LidarClass import Lidar
import sys
import re
sys.path.append((next(re.finditer(".*unified_frameworks", __file__)).group()))
from sensor_array.LidarClass import Lidar
from unified_utils import Service, polar_sum

config = {
"verbose":False
}
class FakeLidar(Lidar):
def __init__(self, empty_scans=False) -> None:
self.n = 30
self.scan = np.random.rand(self.n, 3)*1_000 +2_000
def __init__(self, points=100, angular_rate=1, translational_rate=1, jitter=0, noise=0, empty_scans=False) -> None:
self.n = points
angles = np.linspace(0, 360, self.n)
distances = [5_000]
while len(distances) < self.n:
shift = 0.5 # if np.random.rand() < 1 else 1
distances.append(distances[-1]+(np.random.randn()*shift*1_000))
quality = [15]*self.n
self.scan = np.stack([quality, angles, distances], 1)
# self.scan = np.random.rand(self.n, 3)*1_000 +2_000
self.empty_scans = empty_scans
self.noise = noise
def update(is_alive):
while is_alive():
self.scan[:, 1]+=(0.7+np.random.randn()*jitter)*angular_rate
drift = (np.pi/2, 10*translational_rate*(np.e**(jitter*np.random.randn())))
self.scan[:, 1] = np.deg2rad(self.scan[:, 1])
self.scan[:, [1,2]] = [polar_sum(i, drift) for i in self.scan[:, [1,2]]]
self.scan[:, 1] = np.rad2deg(self.scan[:, 1])
time.sleep(1/10)
self.s = Service(update, "Fake Lidar Generator")
pass
def connect(self, max_attempts=3, wait_seconds=1, verbose_attempts=True) -> bool:
self.s.start_service()
return True
def get_measures(self):
if self.empty_scans:
return []
self.scan += (0,1,0)
res = self.scan + (np.random.rand(self.n, 3)-0.5)*(0,0,400)
# self.scan += (0,1,0)
res = self.scan + (np.random.rand(self.n, 3)-0.5)*(0,0,400)*self.noise
print(res) if config["verbose"] else None
return res
# return self.scan
def disconnect(self):
self.s.stop_service()
pass

if __name__=='__main__':
Expand Down
2 changes: 1 addition & 1 deletion unified_frameworks/sensor_array/server_lidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def default(self, obj):
else:
print(f"[SERVER] Device port {port} found!")
lidar = ActualLidar()
lidar.connect()

clients = [] # @note List of clients connected in the server
buffer = None
Expand Down Expand Up @@ -96,6 +95,7 @@ async def startServer():
if __name__ == "__main__":
try:
print("[SERVER] Server ON")
lidar.connect()
asyncio.run(startServer())
except KeyboardInterrupt:
print("[SERVER] Keyboard Interrupt occurred!")
Expand Down