Skip to content

Commit

Permalink
[xcvrd] Enhance xcvrd to handle new system level event/error (#39)
Browse files Browse the repository at this point in the history
* make xcvrd wait when system not ready
* [xcvrd] fix indents and tense error.
* [xcvrd]update retry-until-ready logic in state-machine approach
* [xcvrd] receiving a normal event when "SYSTEM_NOT_READY", transmit the state to "SYSTEM_READY"
* [xcvrd]set timeout = 0 when enters SYSTEM_READY state.
* [xcvrd]
update the logic of sfp_state_update_task in a state machine approach
the definition of state machine is in front of sfp_state_update_task.task_worker
* [xcvrd]address review comments
* [xcvrd] fix merging confilcts
* [xcvrd] improve readability.
  • Loading branch information
keboliu authored and qiluo-msft committed Aug 10, 2019
1 parent be616c8 commit 88d6d5a
Showing 1 changed file with 182 additions and 29 deletions.
211 changes: 182 additions & 29 deletions sonic-xcvrd/scripts/xcvrd
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,27 @@ XCVRD_MAIN_THREAD_SLEEP_SECS = 60
SFP_STATUS_INSERTED = '1'
SFP_STATUS_REMOVED = '0'

EVENT_ON_ALL_SFP = '-1'
# events definition
SYSTEM_NOT_READY = 'system_not_ready'
SYSTEM_BECOME_READY = 'system_become_ready'
SYSTEM_FAIL = 'system_fail'
NORMAL_EVENT = 'normal'
# states definition
STATE_INIT = 0
STATE_NORMAL = 1
STATE_EXIT = 2

PHYSICAL_PORT_NOT_EXIST = -1
SFP_EEPROM_NOT_READY = -2

SFPUTIL_LOAD_ERROR = 1
PORT_CONFIG_LOAD_ERROR = 2
NOT_IMPLEMENTED_ERROR = 3

RETRY_TIMES_FOR_SYSTEM_READY = 24
RETRY_PERIOD_FOR_SYSTEM_READY_MSECS = 5000

TEMP_UNIT = 'C'
VOLT_UNIT = 'Volts'
POWER_UNIT = 'dBm'
Expand Down Expand Up @@ -130,15 +144,15 @@ def _wrapper_get_transceiver_dom_threshold_info(physical_port):

return platform_sfputil.get_transceiver_dom_threshold_info_dict(physical_port)

def _wrapper_get_transceiver_change_event():
def _wrapper_get_transceiver_change_event(timeout):
if platform_chassis is not None:
try:
status, events = platform_chassis.get_change_event()
status, events = platform_chassis.get_change_event(timeout)
sfp_events = events['sfp']
return status, sfp_events
except NotImplementedError:
pass
return platform_sfputil.get_transceiver_change_event()
return platform_sfputil.get_transceiver_change_event(timeout)

# Remove unnecessary unit from the raw data
def beautify_dom_info_dict(dom_info_dict):
Expand Down Expand Up @@ -657,6 +671,30 @@ class sfp_state_update_task:
self.task_process = None
self.task_stopping_event = multiprocessing.Event()

def _mapping_event_from_change_event(self, status, port_dict):
"""
mapping from what get_transceiver_change_event returns to event defined in the state machine
the logic is pretty straightforword
"""
if status:
if bool(port_dict):
event = NORMAL_EVENT
else:
event = SYSTEM_BECOME_READY
# here, a simple timeout event whose port_dict is empty is mapped
# into a SYSTEM_BECOME_READY event so that it can be handled
port_dict[EVENT_ON_ALL_SFP] = SYSTEM_BECOME_READY
else:
if EVENT_ON_ALL_SFP in port_dict.keys():
event = port_dict[EVENT_ON_ALL_SFP]
else:
# this should not happen. just for protection
event = SYSTEM_FAIL
port_dict[EVENT_ON_ALL_SFP] = SYSTEM_FAIL

logger.log_debug("mapping from {} {} to {}".format(status, port_dict, event))
return event

def task_worker(self, stopping_event):
logger.log_info("Start SFP monitoring loop")

Expand All @@ -672,36 +710,151 @@ class sfp_state_update_task:
swsscommon.APP_PORT_TABLE_NAME)

# Start loop to listen to the sfp change event
# The state migrating sequence:
# 1. When the system starts, it is in "INIT" state, calling get_transceiver_change_event
# with RETRY_PERIOD_FOR_SYSTEM_READY_MSECS as timeout for as many as RETRY_TIMES_FOR_SYSTEM_READY
# times
# 2. Once 'system_become_ready' returned, the system enters "SYSTEM_READY" state and starts to monitor
# the insertion/removal event of all the SFP modules.
# In this state, receiving any system level event will be treated as an unrecoverable error and cause
# the daemon exit

# states definition
# - Initial state: INIT, before received system ready or a normal event
# - Final state: EXIT
# - other state: NORMAL, after has received system-ready or a normal event

# events definition
# - SYSTEM_NOT_READY
# - SYSTEM_BECOME_READY
# -
# - NORMAL_EVENT
# - sfp insertion/removal
# - timeout returned by sfputil.get_change_event with status = true
# - SYSTEM_FAIL

# State transmit:
# 1. SYSTEM_NOT_READY
# - INIT
# - retry < RETRY_TIMES_FOR_SYSTEM_READY
# retry ++
# - else
# max retry reached, treat as fatal, exit
# - NORMAL
# Treat as a fatal error, exit
# 2. SYSTEM_BECOME_READY
# - INIT
# transmit to NORMAL
# - NORMAL
# log the event
# nop
# 3. NORMAL_EVENT
# - INIT (for the vendors who don't implement SYSTEM_BECOME_READY)
# transmit to NORMAL
# handle the event normally
# - NORMAL
# handle the event normally
# 4. SYSTEM_FAIL
# treat as a fatal error

# State event next state
# INIT SYSTEM NOT READY INIT / EXIT
# INIT SYSTEM BECOME READY NORMAL
# NORMAL SYSTEM BECOME READY NORMAL
# INIT/NORMAL SYSTEM FAIL EXIT
# INIT/NORMAL NORMAL EVENT NORMAL
# NORMAL SYSTEM NOT READY EXIT
# EXIT -

retry = 0
timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS
state = STATE_INIT
while not stopping_event.is_set():
status, port_dict = _wrapper_get_transceiver_change_event()
if status:
for key, value in port_dict.iteritems():
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
for logical_port in logical_port_list:
if value == SFP_STATUS_INSERTED:
logger.log_info("Got SFP inserted event {}".format(logical_port))
rc = post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict)
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
if rc == SFP_EEPROM_NOT_READY:
logger.log_warning("SFP EEPROM is not ready. One more try...")
time.sleep(TIME_FOR_SFP_READY_SECS)
post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict)
post_port_dom_info_to_db(logical_port, dom_tbl)
post_port_dom_threshold_info_to_db(logical_port, dom_tbl)
notify_media_setting(logical_port, transceiver_dict, app_port_tbl)
transceiver_dict.clear()
elif value == SFP_STATUS_REMOVED:
logger.log_info("Got SFP removed event {}".format(logical_port))
del_port_sfp_dom_info_from_db(logical_port, int_tbl, dom_tbl)
else:
# TODO, SFP return error code, need handle accordingly.
continue
next_state = state
time_start = time.time()
status, port_dict = _wrapper_get_transceiver_change_event(timeout)
logger.log_debug("Got event {} {} in state {}".format(status, port_dict, state))
event = self._mapping_event_from_change_event(status, port_dict)
if event == SYSTEM_NOT_READY:
if state == STATE_INIT:
# system not ready, wait and retry
if retry >= RETRY_TIMES_FOR_SYSTEM_READY:
logger.log_error("System failed to get ready in {} secs or received system error. Exiting...".format((RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000)*RETRY_TIMES_FOR_SYSTEM_READY))
next_state = STATE_EXIT
else:
retry = retry + 1

# get_transceiver_change_event may return immediately,
# we want the retry expired in expected time period,
# So need to calc the time diff,
# if time diff less that the pre-defined waiting time,
# use sleep() to complete the time.
time_now = time.time()
time_diff = time_now - time_start
if time_diff < RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000:
time.sleep(RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000 - time_diff)
elif state == STATE_NORMAL:
logger.log_error("Got system_not_ready in normal state, treat as fatal. Exiting...")
next_state = STATE_EXIT
else:
next_state = STATE_EXIT
elif event == SYSTEM_BECOME_READY:
if state == STATE_INIT:
next_state = STATE_NORMAL
logger.log_info("Got system_become_ready in init state, transmit to normal state")
elif state == STATE_NORMAL:
logger.log_info("Got system_become_ready in normal state, ignored")
else:
next_state = STATE_EXIT
elif event == NORMAL_EVENT:
if state == STATE_NORMAL or state == STATE_INIT:
if state == STATE_INIT:
next_state = STATE_NORMAL
# this is the originally logic that handled the transceiver change event
# this can be reached in two cases:
# 1. the state has been normal before got the event
# 2. the state was init and is transmitted to normal after got the event.
# this is for the vendors who don't implement "system_not_ready/system_becom_ready" logic
for key, value in port_dict.iteritems():
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
for logical_port in logical_port_list:
if value == SFP_STATUS_INSERTED:
logger.log_info("Got SFP inserted event")
rc = post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict)
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
if rc == SFP_EEPROM_NOT_READY:
logger.log_warning("SFP EEPROM is not ready. One more try...")
time.sleep(TIME_FOR_SFP_READY_SECS)
post_port_sfp_info_to_db(logical_port, int_tbl, transceiver_dict)
post_port_dom_info_to_db(logical_port, dom_tbl)
post_port_dom_threshold_info_to_db(logical_port, dom_tbl)
notify_media_setting(logical_port, transceiver_dict, app_port_tbl)
transceiver_dict.clear()
elif value == SFP_STATUS_REMOVED:
logger.log_info("Got SFP removed event")
del_port_sfp_dom_info_from_db(logical_port, int_tbl, dom_tbl)
else:
# TODO, SFP return error code, need handle accordingly.
logger.log_warning("Got unknown event {}, ignored".format(value))
continue
else:
next_state = STATE_EXIT
elif event == SYSTEM_FAIL:
# no matter which state current it is, it's fatal
next_state = STATE_EXIT
logger.log_error("Got system_fail event on state {}, exiting".format(state))
else:
# If get_transceiver_change_event() return error, will clean up the DB and then exit
# TODO: next step need to define more error types to handle accordingly.
logger.log_error("Failed to get transceiver change event. Exiting...")
logger.log_warning("Got unknown event {} on state {}.".format(event, state))

if next_state != state:
logger.log_debug("State transmitted from {} to {}".format(state, next_state))
state = next_state

if next_state == STATE_EXIT:
os.kill(os.getppid(), signal.SIGTERM)
break
elif next_state == STATE_NORMAL:
timeout = 0

logger.log_info("Stop SFP monitoring loop")

Expand Down

0 comments on commit 88d6d5a

Please sign in to comment.