Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance tuning metric logging #132

Open
wants to merge 2 commits into
base: performance_tuning
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def update_agent(self, action):
raise NotImplementedError('Agent control must be able to update agent')

@abc.abstractmethod
def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
'''Returns the reward, done flag, step metrics after action is taken

Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def update_agent(self, action):
AgentInfo.START_NDIST.value: 0.0}})
return self.bot_car_progresses

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
'''Judge action to see whether reset is needed

Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def update_agent(self, action):
self._update_track_data_object_poses()
return {}

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
for agent_name, agent_info in agents_info_map.items():
# check racecar crash with a obstacle
crashed_object_name = agent_info[AgentInfo.CRASHED_OBJECT_NAME.value] \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import rospy
import logging
import json
import time
from threading import RLock
from gazebo_msgs.msg import ModelState
from std_msgs.msg import Float64, String
Expand All @@ -32,6 +33,7 @@
CarControlTopic, WEBRTC_CAR_CTRL_FORMAT)
from markov.visual_effects.effects.blink_effect import BlinkEffect
from markov.constants import DEFAULT_PARK_POSITION, SIMAPP_VERSION_4
from markov.architecture.constants import Headers
from markov.gazebo_tracker.trackers.get_link_state_tracker import GetLinkStateTracker
from markov.gazebo_tracker.trackers.get_model_state_tracker import GetModelStateTracker
from markov.gazebo_tracker.trackers.set_model_state_tracker import SetModelStateTracker
Expand Down Expand Up @@ -188,8 +190,25 @@ def __init__(self, config_dict, run_phase_sink, metrics):
CarControlTopic.STATUS_CTRL.value),
String,
self._update_car_status)

# TIMING
TIME_WINDOW=10
self.time = np.zeros([TIME_WINDOW, 2])

AbstractTracker.__init__(self, TrackerPriority.HIGH)

def get_rtf(self):
index = int(self._data_dict_['steps']) % self.time.shape[0]
self.time[index,0] = time.time()
self.time[index,1] = rospy.get_time()

wall_time_incr = np.max(self.time[:,0]) - np.min(self.time[:,0])
sim_time_incr = np.max(self.time[:,1]) - np.min(self.time[:,1])

rtf = sim_time_incr / wall_time_incr

return rtf

def update_tracker(self, delta_time, sim_time):
"""
Callback when sim time is updated
Expand Down Expand Up @@ -648,7 +667,7 @@ def _update_speed(self, action):
new_speed = max(min(const.MAX_SPEED, new_speed), const.MIN_SPEED)
return float(new_speed / self._wheel_radius_)

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, headers):
'''Judge the action that agent just take

Args:
Expand Down Expand Up @@ -701,6 +720,21 @@ def judge_action(self, agents_info_map):
self._step_metrics_[StepMetrics.TIME.value] = self._current_sim_time
self._step_metrics_[StepMetrics.EPISODE_STATUS.value] = episode_status
self._step_metrics_[StepMetrics.PAUSE_DURATION.value] = self._pause_duration

self._step_metrics_[StepMetrics.RTF.value] = self.get_rtf()

if Headers.STEREO.value in headers:
left_header, right_header = headers[Headers.STEREO.value][0], headers[Headers.STEREO.value][1]
self._step_metrics_[StepMetrics.LEFT_CAM_TIME.value] = left_header.stamp.secs + left_header.stamp.nsecs * 1e-9
self._step_metrics_[StepMetrics.LEFT_CAM_SEQ.value] = left_header.seq
self._step_metrics_[StepMetrics.RIGHT_CAM_TIME.value] = right_header.stamp.secs + right_header.stamp.nsecs * 1e-9
self._step_metrics_[StepMetrics.RIGHT_CAM_SEQ.value] = right_header.seq
else:
self._step_metrics_[StepMetrics.LEFT_CAM_TIME.value] = 0.0
self._step_metrics_[StepMetrics.LEFT_CAM_SEQ.value] = 0
self._step_metrics_[StepMetrics.RIGHT_CAM_TIME.value] = 0.0
self._step_metrics_[StepMetrics.RIGHT_CAM_SEQ.value] = 0

self._data_dict_['prev_progress'] = 0.0 if self._step_metrics_[StepMetrics.PROG.value] == 100 \
else self._step_metrics_[StepMetrics.PROG.value]
if self._data_dict_['current_progress'] == 100:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def send_action(self, action):
def update_agent(self, action):
return {}

def judge_action(self, agents_info_map):
def judge_action(self, agents_info_map, next_state):
return None, None, None

def finish_episode(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def reset_agent(self):
sensor_state = None
if self._sensor_ is not None:
self._sensor_.reset()
sensor_state = self._sensor_.get_state()
sensor_state, _ = self._sensor_.get_state()
self._ctrl_.reset_agent()
return sensor_state

Expand Down Expand Up @@ -90,10 +90,10 @@ def judge_action(self, action, agents_info_map):
tuple: tuple contains next state sensor observation, reward value, and done flag
'''
if self._sensor_ is not None:
next_state = self._sensor_.get_state()
next_state, next_headers = self._sensor_.get_state()
else:
next_state = None
reward, done, _ = self._ctrl_.judge_action(agents_info_map)
next_state, next_headers = None, None
reward, done, _ = self._ctrl_.judge_action(agents_info_map, next_headers)
if hasattr(self._ctrl_, 'reward_data_pub') and self._ctrl_.reward_data_pub is not None:
raw_state = self._sensor_.get_raw_state()
# More visualizations topics can be added here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,24 @@ class Input(Enum):
CAMERA = 'FRONT_FACING_CAMERA'
LEFT_CAMERA = 'LEFT_CAMERA'
STEREO = 'STEREO_CAMERAS'


@classmethod
def validate_inputs(cls, input_list):
'''Returns True if the all the inputs in input_list is supported, False otherwise'''
return all([_input in cls._value2member_map_ for _input in input_list])

class Headers(Enum):
'''Enum with available inputs, as we add sensors we should add inputs.
This is also important for networks with more than one input.
'''
OBSERVATION = 'observation_headers'
LIDAR = 'LIDAR_HEADERS'
SECTOR_LIDAR = 'SECTOR_LIDAR_HEADERS' # BINARY_SECTOR_LIDAR
DISCRETIZED_SECTOR_LIDAR = 'DISCRETIZED_SECTOR_LIDAR_HEADERS'
CAMERA = 'FRONT_FACING_CAMERA_HEADERS'
LEFT_CAMERA = 'LEFT_CAMERA_HEADERS'
STEREO = 'STEREO_CAMERAS_HEADERS'

@classmethod
def validate_inputs(cls, input_list):
'''Returns True if the all the inputs in input_list is supported, False otherwise'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def main():
parser.add_argument('--s3_endpoint_url',
help='(string) S3 endpoint URL',
type=str,
default=rospy.get_param("S3_ENDPOINT_URL", None))
default=rospy.get_param("S3_ENDPOINT_URL", None))
parser.add_argument('--aws_region',
help='(string) AWS region',
type=str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class StepMetrics(Enum):
TIME = 'tstamp'
EPISODE_STATUS = 'episode_status'
PAUSE_DURATION = 'pause_duration'
RTF = 'rtf'
LEFT_CAM_TIME = 'left_cam_time'
LEFT_CAM_SEQ = 'left_cam_seq'
RIGHT_CAM_TIME = 'right_cam_time'
RIGHT_CAM_SEQ = 'right_cam_seq'

@classmethod
def make_default_metric(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def sim_trace_log(sim_trace_dict):
sim_trace_dict - Ordered dict containing the step metrics, note order must match
precision in the string
'''
LOGGER.info('SIM_TRACE_LOG:%d,%d,%.4f,%.4f,%.4f,%.2f,%.2f,%s,%.4f,%s,%s,%.4f,%d,%.2f,%s,%s,%.2f\n' % \
LOGGER.info('SIM_TRACE_LOG:%d,%d,%.4f,%.4f,%.4f,%.2f,%.2f,%s,%.4f,%s,%s,%.4f,%d,%.2f,%s,%s,%.2f,%.3f,%.3f,%d,%.3f,%d\n' % \
(tuple(sim_trace_dict.values())))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def rollout_worker(graph_manager, num_workers, rollout_idx, task_parameters, sim
# upload simtrace and mp4 into s3 bucket
for s3_writer in simtrace_video_s3_writers:
s3_writer.persist(utils.get_s3_kms_extra_args())
graph_manager.phase = RunPhase.WAITING
graph_manager.phase = RunPhase.WAITING
pause_physics(EmptyRequest())

while new_checkpoint < last_checkpoint + 1:
Expand Down Expand Up @@ -230,7 +230,7 @@ def rollout_worker(graph_manager, num_workers, rollout_idx, task_parameters, sim
graph_manager.restore_checkpoint()

last_checkpoint = new_checkpoint

logger.info("Exited main loop. Done.")

def main():
Expand All @@ -251,15 +251,15 @@ def main():
parser.add_argument('--s3_endpoint_url',
help='(string) S3 endpoint URL',
type=str,
default=rospy.get_param("S3_ENDPOINT_URL", None))
default=rospy.get_param("S3_ENDPOINT_URL", None))
parser.add_argument('--num_workers',
help="(int) The number of workers started in this pool",
type=int,
default=int(rospy.get_param("NUM_WORKERS", 1)))
parser.add_argument('--rollout_idx',
help="(int) The index of current rollout worker",
type=int,
default=0)
default=0)
parser.add_argument('-r', '--redis_ip',
help="(string) IP or host for the redis server",
default='localhost',
Expand Down Expand Up @@ -388,7 +388,7 @@ def main():
metrics_s3_config = {MetricsS3Keys.METRICS_BUCKET.value: rospy.get_param('METRICS_S3_BUCKET'),
MetricsS3Keys.METRICS_KEY.value: metrics_key,
MetricsS3Keys.ENDPOINT_URL.value: rospy.get_param('S3_ENDPOINT_URL', None),
MetricsS3Keys.REGION.value: rospy.get_param('AWS_REGION')}
MetricsS3Keys.REGION.value: rospy.get_param('AWS_REGION')}

run_phase_subject = RunPhaseSubject()

Expand Down Expand Up @@ -466,7 +466,7 @@ def main():
ip_config = IpConfig(bucket=args.s3_bucket,
s3_prefix=args.s3_prefix,
region_name=args.aws_region,
s3_endpoint_url=args.s3_endpoint_url,
s3_endpoint_url=args.s3_endpoint_url,
local_path=IP_ADDRESS_LOCAL_PATH.format('agent'))
redis_ip = ip_config.get_ip_config()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_observation_space(self):

def get_state(self, block=True):
state = dict()
headers = dict()

# For blocking requests, run a blocking call on each sensor
if block:
Expand All @@ -39,15 +40,20 @@ def get_state(self, block=True):
# will use the latest Lidar data whether it was used previously or not.
if isinstance(sensor, LidarInterface):
continue
state.update(sensor.get_state(block=block))

state_value, header_value = sensor.get_state(block=block)
state.update(state_value)
headers.update(header_value)

# For all requests, follow-up with a non-blocking call
# This will ensure we have the latest for all sensors in the event that one of the
# earlier sensors in the list published new data while waiting on a blocking call above
for sensor in self.sensors:
state.update(sensor.get_state(block=False))
state_value, header_value = sensor.get_state(block=False)
state.update(state_value)
headers.update(header_value)

return state
return state, headers

def get_raw_state(self):
raw_data = dict()
Expand Down
Loading