Skip to content

Commit

Permalink
Merge pull request #42 from keboliu/backport-state-machine
Browse files Browse the repository at this point in the history
[xcvrd] backport PR(#39) "Enhance xcvrd to handle new system level event/error" to 201811
  • Loading branch information
yxieca authored Aug 16, 2019
2 parents 42f64d8 + 7ab9888 commit 1039764
Showing 1 changed file with 200 additions and 36 deletions.
236 changes: 200 additions & 36 deletions sonic-xcvrd/scripts/xcvrd
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,24 @@ TIME_FOR_SFP_READY_SECS = 1
RETRIES_FOR_SPF_READY = 5
XCVRD_MAIN_THREAD_SLEEP_MSECS = 60000

RETRY_TIMES_FOR_SYSTEM_READY = 24
RETRY_PERIOD_FOR_SYSTEM_READY_MSECS = 5000

SFP_STATUS_INSERTED = '1'
SFP_STATUS_REMOVED = '0'

EVENT_ON_ALL_SFP = '-1'
# events definition
SYSTEM_NOT_READY = 'system_not_ready'
SYSTEM_BECOME_READY = 'system_become_ready'
SYSTEM_FAIL = 'system_fail'
NORMAL_EVENT = 'normal'
# states definition
STATE_INIT = 0
STATE_NORMAL = 1
STATE_EXIT = 2

SFP_EEPROM_HANDLE_SUCCESS = 0
PHYSICAL_PORT_NOT_EXIST = -1
SFP_EEPROM_NOT_READY = -2

Expand All @@ -63,6 +78,8 @@ VOLT_UNIT = 'Volts'
POWER_UNIT = 'dBm'
BIAS_UNIT = 'mA'

XCVRD_MAIN_TASK_RUNNING_FLAG = True

#========================== Syslog wrappers ==========================

def log_info(msg, also_print_to_console=False):
Expand Down Expand Up @@ -92,15 +109,16 @@ def log_error(msg, also_print_to_console=False):
#========================== Signal Handling ==========================

def signal_handler(sig, frame):
global XCVRD_MAIN_TASK_RUNNING_FLAG
if sig == signal.SIGHUP:
log_info("Caught SIGHUP - ignoring...")
return
elif sig == signal.SIGINT:
log_info("Caught SIGINT - exiting...")
sys.exit(128 + sig)
XCVRD_MAIN_TASK_RUNNING_FLAG = False
elif sig == signal.SIGTERM:
log_info("Caught SIGTERM - exiting...")
sys.exit(128 + sig)
XCVRD_MAIN_TASK_RUNNING_FLAG = False
else:
log_warning("Caught unhandled signal '" + sig + "'")
return
Expand Down Expand Up @@ -256,6 +274,8 @@ def post_port_sfp_info_to_db(logical_port_name, table):
log_error("This functionality is currently not implemented for this platform")
sys.exit(3)

return SFP_EEPROM_HANDLE_SUCCESS

# update dom sensor info to db
def post_port_dom_info_to_db(logical_port_name, table):
ganged_port = False
Expand Down Expand Up @@ -333,8 +353,32 @@ def recover_missing_sfp_table_entries(sfp_util, int_tbl):
logical_port_list = sfp_util.logical
for logical_port_name in logical_port_list:
if logical_port_name not in keys:
post_port_sfp_info_to_db(logical_port_name, int_tbl)
log_info("Port {} has been recovered".format(logical_port_name))
rc = post_port_sfp_info_to_db(logical_port_name, int_tbl)
if rc == SFP_EEPROM_HANDLE_SUCCESS:
log_info("Port {} has been recovered".format(logical_port_name))

def mapping_event_from_change_event(status, port_dict):
"""
mapping from what get_transceiver_change_event returns to event defined in the state machine
the logic is pretty straightforword
"""
if status:
if bool(port_dict):
event = NORMAL_EVENT
else:
event = SYSTEM_BECOME_READY
# here, a simple timeout event whose port_dict is empty is mapped
# into a SYSTEM_BECOME_READY event so that it can be handled
port_dict[EVENT_ON_ALL_SFP] = SYSTEM_BECOME_READY
else:
if EVENT_ON_ALL_SFP in port_dict.keys():
event = port_dict[EVENT_ON_ALL_SFP]
else:
# this should not happen. just for protection
event = SYSTEM_FAIL
port_dict[EVENT_ON_ALL_SFP] = SYSTEM_FAIL

return event

# Timer thread wrapper class to update dom info to DB periodically
class dom_info_update_task:
Expand Down Expand Up @@ -403,7 +447,7 @@ def main():
sel.addSelectable(sst)

# Make sure this daemon started after all port configured.
while True:
while XCVRD_MAIN_TASK_RUNNING_FLAG:
(state, c) = sel.select(SELECT_TIMEOUT_MSECS)
if state == swsscommon.Select.TIMEOUT:
continue
Expand All @@ -426,41 +470,161 @@ def main():
dom_info_update.task_run()

# Start main loop to listen to the SFP change event.
# The state migrating sequence:
# 1. When the system starts, it is in "INIT" state, calling get_transceiver_change_event
# with RETRY_PERIOD_FOR_SYSTEM_READY_MSECS as timeout for as many as RETRY_TIMES_FOR_SYSTEM_READY
# times
# 2. Once 'system_become_ready' returned, the system enters "SYSTEM_READY" state and starts to monitor
# the insertion/removal event of all the SFP modules.
# In this state, receiving any system level event will be treated as an unrecoverable error and cause
# the daemon exit

# states definition
# - Initial state: INIT, before received system ready or a normal event
# - Final state: EXIT
# - other state: NORMAL, after has received system-ready or a normal event

# events definition
# - SYSTEM_NOT_READY
# - SYSTEM_BECOME_READY
# -
# - NORMAL_EVENT
# - sfp insertion/removal
# - timeout returned by sfputil.get_change_event with status = true
# - SYSTEM_FAIL

# State transmit:
# 1. SYSTEM_NOT_READY
# - INIT
# - retry < RETRY_TIMES_FOR_SYSTEM_READY
# retry ++
# - else
# max retry reached, treat as fatal, exit
# - NORMAL
# Treat as a fatal error, exit
# 2. SYSTEM_BECOME_READY
# - INIT
# transmit to NORMAL
# - NORMAL
# log the event
# nop
# 3. NORMAL_EVENT
# - INIT (for the vendors who don't implement SYSTEM_BECOME_READY)
# transmit to NORMAL
# handle the event normally
# - NORMAL
# handle the event normally
# 4. SYSTEM_FAIL
# treat as a fatal error

# State event next state
# INIT SYSTEM NOT READY INIT / EXIT
# INIT SYSTEM BECOME READY NORMAL
# NORMAL SYSTEM BECOME READY NORMAL
# INIT/NORMAL SYSTEM FAIL EXIT
# INIT/NORMAL NORMAL EVENT NORMAL
# NORMAL SYSTEM NOT READY EXIT
# EXIT -

log_info("Start main loop")
time_last_recovery_run = time.time()
while True:
status, port_dict = platform_sfputil.get_transceiver_change_event(XCVRD_MAIN_THREAD_SLEEP_MSECS)
if status:
for key, value in port_dict.iteritems():
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
for logical_port in logical_port_list:
if value == SFP_STATUS_INSERTED:
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
if rc == SFP_EEPROM_NOT_READY:
log_info("Port {} isn't present when got SFP insert event".format(logical_port))
retry = 0
while retry <= RETRIES_FOR_SPF_READY:
time.sleep(TIME_FOR_SFP_READY_SECS)
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
if rc == SFP_EEPROM_NOT_READY:
log_info("Port {} isn't present when got SFP insert event, retry {}".format(logical_port, retry))
retry = retry + 1
else:
break
retry = 0
timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS
state = STATE_INIT
while XCVRD_MAIN_TASK_RUNNING_FLAG:
next_state = state
status, port_dict = platform_sfputil.get_transceiver_change_event(timeout)
event = mapping_event_from_change_event(status, port_dict)

if event == SYSTEM_NOT_READY:
if state == STATE_INIT:
# system not ready, wait and retry
if retry >= RETRY_TIMES_FOR_SYSTEM_READY:
log_error("System failed to get ready in {} secs or received system error. Exiting...".format((RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000)*RETRY_TIMES_FOR_SYSTEM_READY))
next_state = STATE_EXIT
else:
retry = retry + 1

# get_transceiver_change_event may return immediately,
# we want the retry expired in expected time period,
# So need to calc the time diff,
# if time diff less that the pre-defined waiting time,
# use sleep() to complete the time.
time_now = time.time()
time_diff = time_now - time_start
if time_diff < RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000:
time.sleep(RETRY_PERIOD_FOR_SYSTEM_READY_MSECS/1000 - time_diff)
elif state == STATE_NORMAL:
log_error("Got system_not_ready in normal state, treat as fatal. Exiting...")
next_state = STATE_EXIT
else:
next_state = STATE_EXIT
elif event == SYSTEM_BECOME_READY:
if state == STATE_INIT:
next_state = STATE_NORMAL
log_info("Got system_become_ready in init state, transmit to normal state")
elif state == STATE_NORMAL:
next_state = STATE_NORMAL
else:
next_state = STATE_EXIT


elif event == NORMAL_EVENT:
if state == STATE_NORMAL or state == STATE_INIT:
if state == STATE_INIT:
next_state = STATE_NORMAL
# this is the originally logic that handled the transceiver change event
# this can be reached in two cases:
# 1. the state has been normal before got the event
# 2. the state was init and is transmitted to normal after got the event.
# this is for the vendors who don't implement "system_not_ready/system_becom_ready" logic
for key, value in port_dict.iteritems():
logical_port_list = platform_sfputil.get_physical_to_logical(int(key))
for logical_port in logical_port_list:
if value == SFP_STATUS_INSERTED:
log_info("Got SFP inserted event")
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
# If we didn't get the sfp info, assuming the eeprom is not ready, give a try again.
if rc == SFP_EEPROM_NOT_READY:
log_info("Port {} isn't present when got SFP insert event".format(logical_port))
retry = 0
while retry <= RETRIES_FOR_SPF_READY:
time.sleep(TIME_FOR_SFP_READY_SECS)
rc = post_port_sfp_info_to_db(logical_port, int_tbl)
if rc == SFP_EEPROM_NOT_READY:
log_info("Port {} isn't present when got SFP insert event, retry {}".format(logical_port, retry))
retry = retry + 1
else:
break
else:
log_info("get sfp info successfully {}, push to db".format(logical_port))
post_port_dom_info_to_db(logical_port, dom_tbl)

elif value == SFP_STATUS_REMOVED:
log_info("Got SFP removed event")
del_port_sfp_dom_info_to_db(logical_port, int_tbl, dom_tbl)
else:
log_info("get sfp info successfully {}, push to db".format(logical_port))
post_port_dom_info_to_db(logical_port, dom_tbl)

elif value == SFP_STATUS_REMOVED:
del_port_sfp_dom_info_to_db(logical_port, int_tbl, dom_tbl)
else:
# TODO, SFP return error code, need handle accordingly.
continue
# TODO, SFP return error code, need handle accordingly.
log_warning("Got unknown event {}, ignored".format(value))
continue
else:
next_state = STATE_EXIT
elif event == SYSTEM_FAIL:
# no matter which state current it is, it's fatal
next_state = STATE_EXIT
log_error("Got system_fail event on state {}, exiting".format(state))
else:
# If get_transceiver_change_event() return error, will clean up the DB and then exit
# TODO: next step need to define more error types to handle accordingly.
log_warning("Got unknown event {} on state {}.".format(event, state))

if next_state != state:
log_info("State transmitted from {} to {}".format(state, next_state))
state = next_state

if next_state == STATE_EXIT:
break
elif next_state == STATE_NORMAL:
# When transit to normal state time out will be changed
timeout = XCVRD_MAIN_THREAD_SLEEP_MSECS

time_now = time.time()
time_diff = time_now - time_last_recovery_run
Expand All @@ -476,7 +640,7 @@ def main():
logical_port_list = platform_sfputil.logical
for logical_port_name in logical_port_list:
del_port_sfp_dom_info_to_db(logical_port_name, int_tbl, dom_tbl)
log_error("Error: return error from get_transceiver_change_event(), exiting...")
log_error("Xcvrd main task stopped, exiting...")
return 1

if __name__ == '__main__':
Expand Down

0 comments on commit 1039764

Please sign in to comment.