-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulation.py
140 lines (103 loc) · 4.54 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import numpy as np
from vehicle_container import VehicleContainer as Container
from vehicle import Vehicle, HumanVehicle, Car, Truck, AutomaticCar
import pygame
class Simulation:
def __init__(self, conf):
self._conf = conf
self._container = Container(conf.nb_lanes)
self._sim_time = 0
self._time_to_next_spawn = 0
def time_step(self, dt):
# loop over all vehicles, update all vehicles
# remove vehicles that are dead
for v in self:
self.time_step_vehicle(v, dt)
self.try_spawn_vehicle()
self._sim_time += dt
def time_step_vehicle(self, vehicle, dt):
vehicle.update(self._conf, self._container, dt)
if vehicle.position > self._conf.road_len:
self._despawn_vehicle(vehicle)
def _despawn_vehicle(self, vehicle):
self._container.despawn(vehicle)
def try_spawn_vehicle(self): #Tried to fix the spawning issue
# If the time has come to spawn new vehicle.
if self._sim_time >= self._time_to_next_spawn:
p = np.random.rand()
lane = np.random.randint(self._conf.nb_lanes)
if p < 0.45:
vehicle = Car(lane)
elif p >= 0.45 and p < 0.90:
vehicle = AutomaticCar(lane)
else: # p >= 0.9
lane = self._conf.nb_lanes - 1
vehicle = Truck(lane)
vehicle.velocity = np.random.uniform(
self._conf.speed_range[0],
self._conf.speed_range[1])
# If there already exists a vehicle in the lane.
if self._container.last(lane):
last = self._container.last(lane)
# If the safe distance is not held, don't spawn.
if last.position < last.extremely_safe_distance * 2:
self._time_to_next_spawn = self._sim_time + \
np.random.exponential(1/self._conf.spawn_rate)
return
# Else if distance is below 5 safe_distances, spawn with
# velocity depending on car in front.
elif last.position < (last.extremely_safe_distance * last.velocity*10):
vehicle.velocity = np.random.uniform(
last.velocity*0.5,
last.velocity*min(1, last.position/(2*last.extremely_safe_distance) + 1))
# Spawn the car.
self._spawn_vehicle(vehicle)
# Play an appropriate truck sound
if isinstance(vehicle, Truck) and self._conf.sound:
truckyeah = pygame.mixer.Sound('data/truckyeah.wav')
truckyeah.play()
# Find time to next car.
self._time_to_next_spawn = self._sim_time + \
np.random.exponential(1/self._conf.spawn_rate)
def _spawn_vehicle(self, vehicle):
self._container.spawn(vehicle)
def find_vehicle(self, pos, lane, max_dist=10):
v = Vehicle(lane)
v.position = pos
w = self._container.get_closest_vehicle(v, lane)
if w and abs(w.position-pos) < max_dist:
return w
return None
def __iter__(self):
return iter(self._container)
###############################################################################
# SIMULATION WITH HANDLERS #
###############################################################################
class SimulationWithHandlers(Simulation):
def __init__(self, conf, handlers=[]):
super().__init__(conf)
self._handlers = handlers
for h in self._handlers:
h._sim = self
def add_handler(self, handler):
self._handlers.append(handler)
def time_step(self, dt):
for h in self._handlers:
h.before_time_step(dt, self._sim_time)
super().time_step(dt)
for h in self._handlers:
h.after_time_step(dt, self._sim_time)
def time_step_vehicle(self, vehicle, dt):
for h in self._handlers:
h.before_vehicle_update(dt, vehicle)
super().time_step_vehicle(vehicle, dt)
for h in self._handlers:
h.after_vehicle_update(dt, vehicle)
def _spawn_vehicle(self, vehicle):
super()._spawn_vehicle(vehicle)
for h in self._handlers:
h.after_vehicle_spawn(vehicle, self._sim_time)
def _despawn_vehicle(self, vehicle):
for h in self._handlers:
h.before_vehicle_despawn(vehicle, self._sim_time)
super()._despawn_vehicle(vehicle)