Skip to content

Commit

Permalink
Improve feature mode switch process (#12188)
Browse files Browse the repository at this point in the history
* Fix kube mode to local mode long duration issue

* Remove IPV6 parameters which is not necessary

* Fix read node labels bug

* Tag the running image to latest if it's stable

* Disable image_version_higher check

* Change image_version_higher checker test case

Signed-off-by: Yun Li <[email protected]>
  • Loading branch information
lixiaoyuner authored Nov 2, 2022
1 parent a31a4e7 commit e1440f0
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 51 deletions.
1 change: 1 addition & 0 deletions src/sonic-ctrmgrd/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ tests/__pycache__/
ctrmgr/__pycache__/
venv
tests/.coverage*
.pytest_cache/
25 changes: 20 additions & 5 deletions src/sonic-ctrmgrd/ctrmgr/container
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ STATE = "state"

KUBE_LABEL_TABLE = "KUBE_LABELS"
KUBE_LABEL_SET_KEY = "SET"
SERVER_TABLE = "KUBERNETES_MASTER"
SERVER_KEY = "SERVER"
ST_SER_CONNECTED = "connected"
ST_SER_UPDATE_TS = "update_time"

# Get seconds to wait for remote docker to start.
# If not, revert to local
Expand Down Expand Up @@ -75,8 +79,10 @@ def read_data(is_config, feature, fields):
ret = []

db = cfg_db if is_config else state_db

tbl = swsscommon.Table(db, FEATURE_TABLE)
if feature == SERVER_KEY:
tbl = swsscommon.Table(db, SERVER_TABLE)
else:
tbl = swsscommon.Table(db, FEATURE_TABLE)

data = dict(tbl.get(feature)[1])
for (field, default) in fields:
Expand Down Expand Up @@ -104,6 +110,13 @@ def read_state(feature):
[(CURRENT_OWNER, "none"), (REMOTE_STATE, "none"), (CONTAINER_ID, "")])


def read_server_state():
""" Read requried feature state """

return read_data(False, SERVER_KEY,
[(ST_SER_CONNECTED, "false"), (ST_SER_UPDATE_TS, "")])


def docker_action(action, feature, **kwargs):
""" Execute docker action """
try:
Expand Down Expand Up @@ -192,9 +205,10 @@ def container_start(feature, **kwargs):

set_owner, fallback, _ = read_config(feature)
_, remote_state, _ = read_state(feature)
server_connected, _ = read_server_state()

debug_msg("{}: set_owner:{} fallback:{} remote_state:{}".format(
feature, set_owner, fallback, remote_state))
debug_msg("{}: set_owner:{} fallback:{} remote_state:{} server_connected:{}".format(
feature, set_owner, fallback, remote_state, server_connected))

data = {
SYSTEM_STATE: "up",
Expand All @@ -207,8 +221,9 @@ def container_start(feature, **kwargs):
start_val = START_LOCAL
else:
start_val = START_KUBE
if fallback and (remote_state == "none"):
if fallback and (remote_state == "none" or server_connected == "false"):
start_val |= START_LOCAL
data[REMOTE_STATE] = "none"

if start_val == START_LOCAL:
# Implies *only* local.
Expand Down
16 changes: 8 additions & 8 deletions src/sonic-ctrmgrd/ctrmgr/container_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,14 @@ def container_up(feature, owner, version):
do_freeze(feature, "This version is marked disabled. Exiting ...")
return

if not instance_higher(feature, state_data[VERSION], version):
# TODO: May Remove label <feature_name>_<version>_enabled
# Else kubelet will continue to re-deploy every 5 mins, until
# master removes the lable to un-deploy.
#
do_freeze(feature, "bail out as current deploy version {} is not higher".
format(version))
return
# if not instance_higher(feature, state_data[VERSION], version):
# # TODO: May Remove label <feature_name>_<version>_enabled
# # Else kubelet will continue to re-deploy every 5 mins, until
# # master removes the lable to un-deploy.
# #
# do_freeze(feature, "bail out as current deploy version {} is not higher".
# format(version))
# return

update_data(state_db, feature, { VERSION: version })

Expand Down
75 changes: 70 additions & 5 deletions src/sonic-ctrmgrd/ctrmgr/ctrmgrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
CFG_SER_IP: "",
CFG_SER_PORT: "6443",
CFG_SER_DISABLE: "false",
CFG_SER_INSECURE: "false"
CFG_SER_INSECURE: "true"
}

dflt_st_ser = {
Expand Down Expand Up @@ -88,18 +88,20 @@
JOIN_LATENCY = "join_latency_on_boot_seconds"
JOIN_RETRY = "retry_join_interval_seconds"
LABEL_RETRY = "retry_labels_update_seconds"
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
USE_K8S_PROXY = "use_k8s_as_http_proxy"

remote_ctr_config = {
JOIN_LATENCY: 10,
JOIN_RETRY: 10,
LABEL_RETRY: 2,
TAG_IMAGE_LATEST: 30,
USE_K8S_PROXY: ""
}

def log_debug(m):
msg = "{}: {}".format(inspect.stack()[1][3], m)
print(msg)
#print(msg)
syslog.syslog(syslog.LOG_DEBUG, msg)


Expand Down Expand Up @@ -148,6 +150,8 @@ def init():
with open(SONIC_CTR_CONFIG, "r") as s:
d = json.load(s)
remote_ctr_config.update(d)
if UNIT_TESTING:
remote_ctr_config[TAG_IMAGE_LATEST] = 0


class MainServer:
Expand All @@ -172,11 +176,11 @@ def register_db(self, db_name):
self.db_connectors[db_name] = swsscommon.DBConnector(db_name, 0)


def register_timer(self, ts, handler):
def register_timer(self, ts, handler, args=()):
""" Register timer based handler.
The handler will be called on/after give timestamp, ts
"""
self.timer_handlers[ts].append(handler)
self.timer_handlers[ts].append((handler, args))


def register_handler(self, db_name, table_name, handler):
Expand Down Expand Up @@ -235,7 +239,7 @@ def run(self):
lst = self.timer_handlers[k]
del self.timer_handlers[k]
for fn in lst:
fn()
fn[0](*fn[1])
else:
timeout = (k - ct_ts).seconds
break
Expand Down Expand Up @@ -426,6 +430,54 @@ def do_join(self, ip, port, insecure):
format(remote_ctr_config[JOIN_RETRY], self.start_time))


def tag_latest_image(server, feat, docker_id, image_ver):
res = 1
if not UNIT_TESTING:
status = os.system("docker ps |grep {} >/dev/null".format(docker_id))
if status:
syslog.syslog(syslog.LOG_ERR,
"Feature {}:{} is not stable".format(feat, image_ver))
else:
image_item = os.popen("docker inspect {} |jq -r .[].Image".format(docker_id)).read().strip()
if image_item:
image_id = image_item.split(":")[1][:12]
image_info = os.popen("docker images |grep {}".format(image_id)).read().split()
if image_info:
image_rep = image_info[0]
res = os.system("docker tag {} {}:latest".format(image_id, image_rep))
if res != 0:
syslog.syslog(syslog.LOG_ERR,
"Failed to tag {}:{} to latest".format(image_rep, image_ver))
else:
syslog.syslog(syslog.LOG_INFO,
"Successfully tag {}:{} to latest".format(image_rep, image_ver))
feat_status = os.popen("docker inspect {} |jq -r .[].State.Running".format(feat)).read().strip()
if feat_status:
if feat_status == 'true':
os.system("docker stop {}".format(feat))
syslog.syslog(syslog.LOG_ERR,
"{} should not run, stop it".format(feat))
os.system("docker rm {}".format(feat))
syslog.syslog(syslog.LOG_INFO,
"Delete previous {} container".format(feat))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to docker images |grep {} to get image repo".format(image_id))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to inspect container:{} to get image id".format(docker_id))
else:
server.mod_db_entry(STATE_DB_NAME,
FEATURE_TABLE, feat, {"tag_latest": "true"})
res = 0
if res:
log_debug("failed to tag {}:{} to latest".format(feat, image_ver))
else:
log_debug("successfully tag {}:{} to latest".format(feat, image_ver))

return res


#
# Feature changes
#
Expand Down Expand Up @@ -523,6 +575,19 @@ def on_state_update(self, key, op, data):
self.st_data[key] = _update_entry(dflt_st_feat, data)
remote_state = self.st_data[key][ST_FEAT_REMOTE_STATE]

if (old_remote_state != remote_state) and (remote_state == "running"):
# Tag latest
start_time = datetime.datetime.now() + datetime.timedelta(
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
self.server.register_timer(start_time, tag_latest_image, (
self.server,
key,
self.st_data[key][ST_FEAT_CTR_ID],
self.st_data[key][ST_FEAT_CTR_VER]))

log_debug("try to tag latest label after {} seconds @{}".format(
remote_ctr_config[TAG_IMAGE_LATEST], start_time))

if (not init) and (
(old_remote_state == remote_state) or (remote_state != "pending")):
# no change or nothing to do.
Expand Down
8 changes: 4 additions & 4 deletions src/sonic-ctrmgrd/ctrmgr/kube_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _run_command(cmd, timeout=5):

def kube_read_labels():
""" Read current labels on node and return as dict. """
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels |tr -s ' ' | cut -f6 -d' '"
KUBECTL_GET_CMD = "kubectl --kubeconfig {} get nodes {} --show-labels --no-headers |tr -s ' ' | cut -f6 -d' '"

labels = {}
ret, out, _ = _run_command(KUBECTL_GET_CMD.format(
Expand Down Expand Up @@ -332,12 +332,12 @@ def _do_reset(pending_join = False):


def _do_join(server, port, insecure):
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {} --apiserver-advertise-address {}"
KUBEADM_JOIN_CMD = "kubeadm join --discovery-file {} --node-name {}"
err = ""
out = ""
ret = 0
try:
local_ipv6 = _get_local_ipv6()
#local_ipv6 = _get_local_ipv6()
#_download_file(server, port, insecure)
_gen_cli_kubeconf(server, port, insecure)
_do_reset(True)
Expand All @@ -349,7 +349,7 @@ def _do_join(server, port, insecure):

if ret == 0:
(ret, out, err) = _run_command(KUBEADM_JOIN_CMD.format(
KUBE_ADMIN_CONF, get_device_name(), local_ipv6), timeout=60)
KUBE_ADMIN_CONF, get_device_name()), timeout=60)
log_debug("ret = {}".format(ret))

except IOError as e:
Expand Down
1 change: 1 addition & 0 deletions src/sonic-ctrmgrd/ctrmgr/remote_ctr.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"retry_join_interval_seconds": 30,
"retry_labels_update_seconds": 5,
"revert_to_local_on_wait_seconds": 60,
"tag_latest_image_on_wait_seconds": 600,
"use_k8s_as_http_proxy": "n"
}

2 changes: 1 addition & 1 deletion src/sonic-ctrmgrd/tests/container_startup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
common_test.FEATURE_TABLE: {
"snmp": {
"container_id": "no_change",
"container_version": "20201230.77",
"container_version": "20201230.11",
"current_owner": "no_change",
"remote_state": "no_change",
"system_state": "up"
Expand Down
5 changes: 5 additions & 0 deletions src/sonic-ctrmgrd/tests/container_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
"current_owner": "none",
"container_id": ""
}
},
common_test.SERVER_TABLE: {
"SERVER": {
"connected": "true"
}
}
}
},
Expand Down
49 changes: 47 additions & 2 deletions src/sonic-ctrmgrd/tests/ctrmgrd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
}
}
},
Expand Down Expand Up @@ -151,7 +151,7 @@
common_test.KUBE_JOIN: {
"ip": "10.10.10.10",
"port": "6443",
"insecure": "false"
"insecure": "true"
},
common_test.KUBE_RESET: {
"flag": "true"
Expand Down Expand Up @@ -276,6 +276,51 @@
}
}
}
},
3: {
common_test.DESCR: "Tag image latest when remote_state changes to running",
common_test.ARGS: "ctrmgrd",
common_test.PRE: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "pending"
}
}
}
},
common_test.UPD: {
common_test.CONFIG_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"set_owner": "kube"
}
}
},
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"remote_state": "running"
}
}
}
},
common_test.POST: {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"tag_latest": "true"
}
}
}
}
}
}

Expand Down
Loading

0 comments on commit e1440f0

Please sign in to comment.