Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance tuning metric logging #132

Open
wants to merge 2 commits into
base: performance_tuning
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add camera time logging
  • Loading branch information
jochem725 committed Mar 2, 2022
commit be835c5125717c848c342394590eb2f2e93860e5
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ def update_agent(self, action):
raise NotImplementedError('Agent control must be able to update agent')

@abc.abstractmethod
def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
'''Returns the reward, done flag, step metrics after action is taken

Args:
Original file line number Diff line number Diff line change
@@ -335,7 +335,7 @@ def update_agent(self, action):
AgentInfo.START_NDIST.value: 0.0}})
return self.bot_car_progresses

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
'''Judge action to see whether reset is needed

Args:
Original file line number Diff line number Diff line change
@@ -160,7 +160,7 @@ def update_agent(self, action):
self._update_track_data_object_poses()
return {}

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
for agent_name, agent_info in agents_info_map.items():
# check racecar crash with a obstacle
crashed_object_name = agent_info[AgentInfo.CRASHED_OBJECT_NAME.value] \
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
CarControlTopic, WEBRTC_CAR_CTRL_FORMAT)
from markov.visual_effects.effects.blink_effect import BlinkEffect
from markov.constants import DEFAULT_PARK_POSITION, SIMAPP_VERSION_4
from markov.architecture.constants import Headers
from markov.gazebo_tracker.trackers.get_link_state_tracker import GetLinkStateTracker
from markov.gazebo_tracker.trackers.get_model_state_tracker import GetModelStateTracker
from markov.gazebo_tracker.trackers.set_model_state_tracker import SetModelStateTracker
@@ -648,7 +649,7 @@ def _update_speed(self, action):
new_speed = max(min(const.MAX_SPEED, new_speed), const.MIN_SPEED)
return float(new_speed / self._wheel_radius_)

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, headers):
'''Judge the action that agent just take

Args:
@@ -701,6 +702,19 @@ def judge_action(self, agents_info_map):
self._step_metrics_[StepMetrics.TIME.value] = self._current_sim_time
self._step_metrics_[StepMetrics.EPISODE_STATUS.value] = episode_status
self._step_metrics_[StepMetrics.PAUSE_DURATION.value] = self._pause_duration

if Headers.STEREO.value in headers:
left_header, right_header = headers[Headers.STEREO.value][0], headers[Headers.STEREO.value][1]
self._step_metrics_[StepMetrics.LEFT_CAM_TIME.value] = left_header.stamp.secs + left_header.stamp.nsecs * 1e-9
self._step_metrics_[StepMetrics.LEFT_CAM_SEQ.value] = left_header.seq
self._step_metrics_[StepMetrics.RIGHT_CAM_TIME.value] = right_header.stamp.secs + right_header.stamp.nsecs * 1e-9
self._step_metrics_[StepMetrics.RIGHT_CAM_SEQ.value] = right_header.seq
else:
self._step_metrics_[StepMetrics.LEFT_CAM_TIME.value] = 0.0
self._step_metrics_[StepMetrics.LEFT_CAM_SEQ.value] = 0
self._step_metrics_[StepMetrics.RIGHT_CAM_TIME.value] = 0.0
self._step_metrics_[StepMetrics.RIGHT_CAM_SEQ.value] = 0

self._data_dict_['prev_progress'] = 0.0 if self._step_metrics_[StepMetrics.PROG.value] == 100 \
else self._step_metrics_[StepMetrics.PROG.value]
if self._data_dict_['current_progress'] == 100:
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ def send_action(self, action):
def update_agent(self, action):
return {}

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
return None, None, None

def finish_episode(self):
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ def reset_agent(self):
sensor_state = None
if self._sensor_ is not None:
self._sensor_.reset()
sensor_state = self._sensor_.get_state()
sensor_state, _ = self._sensor_.get_state()
self._ctrl_.reset_agent()
return sensor_state

@@ -90,10 +90,10 @@ def judge_action(self, action, agents_info_map):
tuple: tuple contains next state sensor observation, reward value, and done flag
'''
if self._sensor_ is not None:
next_state = self._sensor_.get_state()
next_state, next_headers = self._sensor_.get_state()
else:
next_state = None
reward, done, _ = self._ctrl_.judge_action(agents_info_map)
next_state, next_headers = None, None
reward, done, _ = self._ctrl_.judge_action(agents_info_map, next_headers)
if hasattr(self._ctrl_, 'reward_data_pub') and self._ctrl_.reward_data_pub is not None:
raw_state = self._sensor_.get_raw_state()
# More visualizations topics can be added here
Original file line number Diff line number Diff line change
@@ -33,7 +33,24 @@ class Input(Enum):
CAMERA = 'FRONT_FACING_CAMERA'
LEFT_CAMERA = 'LEFT_CAMERA'
STEREO = 'STEREO_CAMERAS'


@classmethod
def validate_inputs(cls, input_list):
'''Returns True if the all the inputs in input_list is supported, False otherwise'''
return all([_input in cls._value2member_map_ for _input in input_list])

class Headers(Enum):
'''Enum with available inputs, as we add sensors we should add inputs.
This is also important for networks with more than one input.
'''
OBSERVATION = 'observation_headers'
LIDAR = 'LIDAR_HEADERS'
SECTOR_LIDAR = 'SECTOR_LIDAR_HEADERS' # BINARY_SECTOR_LIDAR
DISCRETIZED_SECTOR_LIDAR = 'DISCRETIZED_SECTOR_LIDAR_HEADERS'
CAMERA = 'FRONT_FACING_CAMERA_HEADERS'
LEFT_CAMERA = 'LEFT_CAMERA_HEADERS'
STEREO = 'STEREO_CAMERAS_HEADERS'

@classmethod
def validate_inputs(cls, input_list):
'''Returns True if the all the inputs in input_list is supported, False otherwise'''
Original file line number Diff line number Diff line change
@@ -156,7 +156,7 @@ def main():
parser.add_argument('--s3_endpoint_url',
help='(string) S3 endpoint URL',
type=str,
default=rospy.get_param("S3_ENDPOINT_URL", None))
default=rospy.get_param("S3_ENDPOINT_URL", None))
parser.add_argument('--aws_region',
help='(string) AWS region',
type=str,
Original file line number Diff line number Diff line change
@@ -37,6 +37,10 @@ class StepMetrics(Enum):
TIME = 'tstamp'
EPISODE_STATUS = 'episode_status'
PAUSE_DURATION = 'pause_duration'
LEFT_CAM_TIME = 'left_cam_time'
LEFT_CAM_SEQ = 'left_cam_seq'
RIGHT_CAM_TIME = 'right_cam_time'
RIGHT_CAM_SEQ = 'right_cam_seq'

@classmethod
def make_default_metric(cls):
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ def sim_trace_log(sim_trace_dict):
sim_trace_dict - Ordered dict containing the step metrics, note order must match
precision in the string
'''
LOGGER.info('SIM_TRACE_LOG:%d,%d,%.4f,%.4f,%.4f,%.2f,%.2f,%s,%.4f,%s,%s,%.4f,%d,%.2f,%s,%s,%.2f\n' % \
LOGGER.info('SIM_TRACE_LOG:%d,%d,%.4f,%.4f,%.4f,%.2f,%.2f,%s,%.4f,%s,%s,%.4f,%d,%.2f,%s,%s,%.2f,%.3f,%d,%.3f,%d\n' % \
(tuple(sim_trace_dict.values())))


Original file line number Diff line number Diff line change
@@ -200,7 +200,7 @@ def rollout_worker(graph_manager, num_workers, rollout_idx, task_parameters, sim
# upload simtrace and mp4 into s3 bucket
for s3_writer in simtrace_video_s3_writers:
s3_writer.persist(utils.get_s3_kms_extra_args())
graph_manager.phase = RunPhase.WAITING
graph_manager.phase = RunPhase.WAITING
pause_physics(EmptyRequest())

while new_checkpoint < last_checkpoint + 1:
@@ -230,7 +230,7 @@ def rollout_worker(graph_manager, num_workers, rollout_idx, task_parameters, sim
graph_manager.restore_checkpoint()

last_checkpoint = new_checkpoint

logger.info("Exited main loop. Done.")

def main():
@@ -251,15 +251,15 @@ def main():
parser.add_argument('--s3_endpoint_url',
help='(string) S3 endpoint URL',
type=str,
default=rospy.get_param("S3_ENDPOINT_URL", None))
default=rospy.get_param("S3_ENDPOINT_URL", None))
parser.add_argument('--num_workers',
help="(int) The number of workers started in this pool",
type=int,
default=int(rospy.get_param("NUM_WORKERS", 1)))
parser.add_argument('--rollout_idx',
help="(int) The index of current rollout worker",
type=int,
default=0)
default=0)
parser.add_argument('-r', '--redis_ip',
help="(string) IP or host for the redis server",
default='localhost',
@@ -388,7 +388,7 @@ def main():
metrics_s3_config = {MetricsS3Keys.METRICS_BUCKET.value: rospy.get_param('METRICS_S3_BUCKET'),
MetricsS3Keys.METRICS_KEY.value: metrics_key,
MetricsS3Keys.ENDPOINT_URL.value: rospy.get_param('S3_ENDPOINT_URL', None),
MetricsS3Keys.REGION.value: rospy.get_param('AWS_REGION')}
MetricsS3Keys.REGION.value: rospy.get_param('AWS_REGION')}

run_phase_subject = RunPhaseSubject()

@@ -466,7 +466,7 @@ def main():
ip_config = IpConfig(bucket=args.s3_bucket,
s3_prefix=args.s3_prefix,
region_name=args.aws_region,
s3_endpoint_url=args.s3_endpoint_url,
s3_endpoint_url=args.s3_endpoint_url,
local_path=IP_ADDRESS_LOCAL_PATH.format('agent'))
redis_ip = ip_config.get_ip_config()

Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ def get_observation_space(self):

def get_state(self, block=True):
state = dict()
headers = dict()

# For blocking requests, run a blocking call on each sensor
if block:
@@ -39,15 +40,20 @@ def get_state(self, block=True):
# will use the latest Lidar data whether it was used previously or not.
if isinstance(sensor, LidarInterface):
continue
state.update(sensor.get_state(block=block))

state_value, header_value = sensor.get_state(block=block)
state.update(state_value)
headers.update(header_value)

# For all requests, follow-up with a non-blocking call
# This will ensure we have the latest for all sensors in the event that one of the
# earlier sensors in the list published new data while waiting on a blocking call above
for sensor in self.sensors:
state.update(sensor.get_state(block=False))
state_value, header_value = sensor.get_state(block=False)
state.update(state_value)
headers.update(header_value)

return state
return state, headers

def get_raw_state(self):
raw_data = dict()
Original file line number Diff line number Diff line change
@@ -11,11 +11,12 @@
from markov.sensors.sensor_interface import SensorInterface, LidarInterface
from markov.environments.constants import TRAINING_IMAGE_SIZE
from markov.architecture.constants import Input
from markov.architecture.constants import Headers
from markov.log_handler.deepracer_exceptions import GenericRolloutException, GenericError
from markov import utils
from markov.log_handler.logger import Logger
from markov.log_handler.constants import SIMAPP_SIMULATION_WORKER_EXCEPTION


LOGGER = Logger(__name__, logging.INFO).get_logger()

@@ -81,9 +82,9 @@ def get_state(self, block=True):
image_data.data, 'raw', 'RGB', 0, 1)
image = image.resize(TRAINING_IMAGE_SIZE, resample=2)
self.raw_data = image_data
return {Input.CAMERA.value: np.array(image)}
return {Input.CAMERA.value: np.array(image)}, {}
except utils.DoubleBuffer.Empty:
return {}
return {}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -123,7 +124,7 @@ def __init__(self, racecar_name, timeout=120.0):
self.sensor_type = Input.OBSERVATION.value
self.raw_data = None
self.timeout = timeout

def get_observation_space(self):
try:
return get_observation_space(Input.OBSERVATION.value)
@@ -141,9 +142,9 @@ def get_state(self, block=True):
image_data.data, 'raw', 'RGB', 0, 1)
image = image.resize(TRAINING_IMAGE_SIZE, resample=2)
self.raw_data = image_data
return {Input.OBSERVATION.value: np.array(image)}
return {Input.OBSERVATION.value: np.array(image)}, {}
except utils.DoubleBuffer.Empty:
return {}
return {}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -204,9 +205,9 @@ def get_state(self, block=True):
image_data.data, 'raw', 'RGB', 0, 1)
image = image.resize(TRAINING_IMAGE_SIZE, resample=2)
self.raw_data = image_data
return {Input.LEFT_CAMERA.value: np.array(image)}
return {Input.LEFT_CAMERA.value: np.array(image)}, {}
except utils.DoubleBuffer.Empty:
return {}
return {}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -268,15 +269,17 @@ def get_state(self, block=True):
left_img = Image.frombytes('RGB', (image_data.width, image_data.height),
image_data.data, 'raw', 'RGB', 0, 1)
left_img = left_img.resize(TRAINING_IMAGE_SIZE, resample=2).convert('L')
left_header = image_data.header

image_data = self.image_buffer_right.get(block=block, timeout=self.timeout)
right_img = Image.frombytes('RGB', (image_data.width, image_data.height),
image_data.data, 'raw', 'RGB', 0, 1)
right_img = right_img.resize(TRAINING_IMAGE_SIZE, resample=2).convert('L')
right_header = image_data.header

return {Input.STEREO.value: np.array(np.stack((left_img, right_img), axis=2))}
return {Input.STEREO.value: np.array(np.stack((left_img, right_img), axis=2)) }, { Headers.STEREO.value: [left_header, right_header] }
except utils.DoubleBuffer.Empty:
return {}
return {}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -337,7 +340,7 @@ def get_observation_space(self):

def get_state(self, block=True):
try:
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}, {}
except utils.DoubleBuffer.Empty:
# For Lidar, we always call non-blocking get_state instead of
# block-waiting for new state, and the expectation is
@@ -348,7 +351,7 @@ def get_state(self, block=True):
# Empty, in such case, this may cause issue for the inference due to
# incompatible input to NN. Thus, we should get sensor data with blocking if
# DoubleBuffer.Empty is raised.
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -396,7 +399,7 @@ def get_observation_space(self):

def get_state(self, block=True):
try:
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}, {}
except utils.DoubleBuffer.Empty:
# For Lidar, we always call non-blocking get_state instead of
# block-waiting for new state, and the expectation is
@@ -407,7 +410,7 @@ def get_state(self, block=True):
# Empty, in such case, this may cause issue for the inference due to
# incompatible input to NN. Thus, we should get sensor data with blocking if
# DoubleBuffer.Empty is raised.
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

@@ -455,7 +458,7 @@ def get_observation_space(self):

def get_state(self, block=True):
try:
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=block, timeout=self.timeout)}, {}
except utils.DoubleBuffer.Empty:
# For Lidar, we always call non-blocking get_state instead of
# block-waiting for new state, and the expectation is
@@ -466,7 +469,7 @@ def get_state(self, block=True):
# Empty, in such case, this may cause issue for the inference due to
# incompatible input to NN. Thus, we should get sensor data with blocking if
# DoubleBuffer.Empty is raised.
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}
return {self.sensor_type: self.data_buffer.get(block=True, timeout=self.timeout)}, {}
except Exception as ex:
raise GenericRolloutException("Unable to set state: {}".format(ex))

Loading