Skip to content

Commit

Permalink
Interact with Redis by str instead of bytes, migrate to SonicV2Connec…
Browse files Browse the repository at this point in the history
…tor with `decode_responses=True` (sonic-net#171)

* Interact with Redis by str instead of bytes
* Implement mockredis decode_responses option
  • Loading branch information
qiluo-msft authored Oct 22, 2020
1 parent 4ab6e41 commit 57e54d9
Show file tree
Hide file tree
Showing 17 changed files with 182 additions and 164 deletions.
71 changes: 35 additions & 36 deletions src/sonic_ax_impl/mibs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from swsssdk import SonicV2Connector
from swsssdk import SonicDBConfig
from swsssdk import port_util
from swsssdk.port_util import get_index, get_index_from_str
from swsssdk.port_util import get_index_from_str
from ax_interface.mib import MIBUpdater
from ax_interface.util import oid2tuple
from sonic_ax_impl import logger

COUNTERS_PORT_NAME_MAP = b'COUNTERS_PORT_NAME_MAP'
COUNTERS_QUEUE_NAME_MAP = b'COUNTERS_QUEUE_NAME_MAP'
LAG_TABLE = b'LAG_TABLE'
LAG_MEMBER_TABLE = b'LAG_MEMBER_TABLE'
LOC_CHASSIS_TABLE = b'LLDP_LOC_CHASSIS'
COUNTERS_PORT_NAME_MAP = 'COUNTERS_PORT_NAME_MAP'
COUNTERS_QUEUE_NAME_MAP = 'COUNTERS_QUEUE_NAME_MAP'
LAG_TABLE = 'LAG_TABLE'
LAG_MEMBER_TABLE = 'LAG_MEMBER_TABLE'
LOC_CHASSIS_TABLE = 'LLDP_LOC_CHASSIS'
APPL_DB = 'APPL_DB'
ASIC_DB = 'ASIC_DB'
COUNTERS_DB = 'COUNTERS_DB'
Expand Down Expand Up @@ -78,14 +78,14 @@ def counter_table(sai_id):
:param if_name: given sai_id to cast.
:return: COUNTERS table key.
"""
return b'COUNTERS:oid:0x' + sai_id
return 'COUNTERS:oid:0x' + sai_id

def queue_table(sai_id):
"""
:param sai_id: given sai_id to cast.
:return: COUNTERS table key.
"""
return b'COUNTERS:' + sai_id
return 'COUNTERS:' + sai_id

def queue_key(port_index, queue_index):
return str(port_index) + ':' + str(queue_index)
Expand All @@ -111,23 +111,23 @@ def lldp_entry_table(if_name):
:param if_name: given interface to cast.
:return: LLDP_ENTRY_TABLE key.
"""
return b'LLDP_ENTRY_TABLE:' + if_name
return 'LLDP_ENTRY_TABLE:' + if_name


def if_entry_table(if_name):
"""
:param if_name: given interface to cast.
:return: PORT_TABLE key.
"""
return b'PORT_TABLE:' + if_name
return 'PORT_TABLE:' + if_name


def lag_entry_table(lag_name):
"""
:param lag_name: given lag to cast.
:return: LAG_TABLE key.
"""
return b'LAG_TABLE:' + lag_name
return 'LAG_TABLE:' + lag_name


def mgmt_if_entry_table(if_name):
Expand All @@ -136,7 +136,7 @@ def mgmt_if_entry_table(if_name):
:return: MGMT_PORT_TABLE key
"""

return b'MGMT_PORT|' + if_name
return 'MGMT_PORT|' + if_name


def mgmt_if_entry_table_state_db(if_name):
Expand All @@ -145,7 +145,7 @@ def mgmt_if_entry_table_state_db(if_name):
:return: MGMT_PORT_TABLE key
"""

return b'MGMT_PORT_TABLE|' + if_name
return 'MGMT_PORT_TABLE|' + if_name

def get_sai_id_key(namespace, sai_id):
"""
Expand All @@ -157,7 +157,7 @@ def get_sai_id_key(namespace, sai_id):
Return value: namespace:sai id or sai id
"""
if namespace != '':
return namespace.encode() + b':' + sai_id
return namespace + ':' + sai_id
else:
return sai_id

Expand All @@ -166,15 +166,16 @@ def split_sai_id_key(sai_id_key):
Input - bytes
Return namespace string and sai id in byte string.
"""
result = sai_id_key.split(b':')
result = sai_id_key.split(':')
if len(result) == 1:
return '', sai_id_key
else:
return result[0].decode(), result[1]
return result[0], result[1]

def config(**kwargs):
global redis_kwargs
redis_kwargs = {k:v for (k,v) in kwargs.items() if k in ['unix_socket_path', 'host', 'port']}
redis_kwargs['decode_responses'] = True

def init_db():
"""
Expand All @@ -198,21 +199,21 @@ def init_mgmt_interface_tables(db_conn):
db_conn.connect(CONFIG_DB)
db_conn.connect(STATE_DB)

mgmt_ports_keys = db_conn.keys(CONFIG_DB, mgmt_if_entry_table(b'*'))
mgmt_ports_keys = db_conn.keys(CONFIG_DB, mgmt_if_entry_table('*'))

if not mgmt_ports_keys:
logger.debug('No managment ports found in {}'.format(mgmt_if_entry_table(b'')))
logger.debug('No managment ports found in {}'.format(mgmt_if_entry_table('')))
return {}, {}

mgmt_ports = [key.split(mgmt_if_entry_table(b''))[-1] for key in mgmt_ports_keys]
oid_name_map = {get_index(mgmt_name): mgmt_name for mgmt_name in mgmt_ports}
mgmt_ports = [key.split(mgmt_if_entry_table(''))[-1] for key in mgmt_ports_keys]
oid_name_map = {get_index_from_str(mgmt_name): mgmt_name for mgmt_name in mgmt_ports}
logger.debug('Managment port map:\n' + pprint.pformat(oid_name_map, indent=2))

if_alias_map = dict()

for if_name in oid_name_map.values():
if_entry = db_conn.get_all(CONFIG_DB, mgmt_if_entry_table(if_name), blocking=True)
if_alias_map[if_name] = if_entry.get(b'alias', if_name)
if_alias_map[if_name] = if_entry.get('alias', if_name)

logger.debug("Management alias map:\n" + pprint.pformat(if_alias_map, indent=2))

Expand All @@ -230,7 +231,7 @@ def init_sync_d_interface_tables(db_conn):
# ex: { "Ethernet76" : "1000000000023" }
if_name_map_util, if_id_map_util = port_util.get_interface_oid_map(db_conn)
for if_name, sai_id in if_name_map_util.items():
if_name_str = if_name.decode()
if_name_str = if_name
if (re.match(port_util.SONIC_ETHERNET_RE_PATTERN, if_name_str) or \
re.match(port_util.SONIC_ETHERNET_BP_RE_PATTERN, if_name_str)):
if_name_map[if_name] = sai_id
Expand All @@ -239,17 +240,16 @@ def init_sync_d_interface_tables(db_conn):
# string or in sai id.
# sai_id_key = namespace : sai_id
for sai_id, if_name in if_id_map_util.items():
if_name = if_name.decode()
if (re.match(port_util.SONIC_ETHERNET_RE_PATTERN, if_name) or \
re.match(port_util.SONIC_ETHERNET_BP_RE_PATTERN, if_name)):
if_id_map[get_sai_id_key(db_conn.namespace, sai_id)] = if_name
logger.debug("Port name map:\n" + pprint.pformat(if_name_map, indent=2))
logger.debug("Interface name map:\n" + pprint.pformat(if_id_map, indent=2))

# { OID -> if_name (SONiC) }
oid_name_map = {get_index(if_name): if_name for if_name in if_name_map
oid_name_map = {get_index_from_str(if_name): if_name for if_name in if_name_map
# only map the interface if it's a style understood to be a SONiC interface.
if get_index(if_name) is not None}
if get_index_from_str(if_name) is not None}

logger.debug("OID name map:\n" + pprint.pformat(oid_name_map, indent=2))

Expand All @@ -272,7 +272,7 @@ def init_sync_d_interface_tables(db_conn):

for if_name in if_name_map:
if_entry = db_conn.get_all(APPL_DB, if_entry_table(if_name), blocking=True)
if_alias_map[if_name] = if_entry.get(b'alias', if_name)
if_alias_map[if_name] = if_entry.get('alias', if_name)

logger.debug("Chassis name map:\n" + pprint.pformat(if_alias_map, indent=2))

Expand All @@ -295,28 +295,28 @@ def init_sync_d_lag_tables(db_conn):

db_conn.connect(APPL_DB)

lag_entries = db_conn.keys(APPL_DB, b"LAG_TABLE:*")
lag_entries = db_conn.keys(APPL_DB, "LAG_TABLE:*")

if not lag_entries:
return lag_name_if_name_map, if_name_lag_name_map, oid_lag_name_map

for lag_entry in lag_entries:
lag_name = lag_entry[len(b"LAG_TABLE:"):]
lag_members = db_conn.keys(APPL_DB, b"LAG_MEMBER_TABLE:%s:*" % lag_name)
lag_name = lag_entry[len("LAG_TABLE:"):]
lag_members = db_conn.keys(APPL_DB, "LAG_MEMBER_TABLE:%s:*" % lag_name)
# TODO: db_conn.keys() should really return [] instead of None
if lag_members is None:
lag_members = []

def member_name_str(val, lag_name):
return val[len(b"LAG_MEMBER_TABLE:%s:" % lag_name):]
return val[len("LAG_MEMBER_TABLE:%s:" % lag_name):]

lag_member_names = [member_name_str(m, lag_name) for m in lag_members]
lag_name_if_name_map[lag_name] = lag_member_names
for lag_member_name in lag_member_names:
if_name_lag_name_map[lag_member_name] = lag_name

for if_name in lag_name_if_name_map.keys():
idx = get_index(if_name)
idx = get_index_from_str(if_name)
if idx:
oid_lag_name_map[idx] = if_name

Expand All @@ -342,7 +342,7 @@ def init_sync_d_queue_tables(db_conn):
port_queue_list_map = {}

for queue_name, sai_id in queue_name_map.items():
port_name, queue_index = queue_name.decode().split(':')
port_name, queue_index = queue_name.split(':')
queue_index = ''.join(i for i in queue_index if i.isdigit())
port_index = get_index_from_str(port_name)
key = queue_key(port_index, queue_index)
Expand Down Expand Up @@ -464,12 +464,11 @@ def update_data(self):
keys = []

for key in keys:
key = key.decode()
oid = oid2tuple(key, dot_prefix=False)
self.oid_list.append(oid)
value = Namespace.dbs_get_all(self.db_conn, SNMP_OVERLAY_DB, key)
if value[b'type'] in [b'COUNTER_32', b'COUNTER_64']:
self.oid_map[oid] = int(value[b'data'])
if value['type'] in ['COUNTER_32', 'COUNTER_64']:
self.oid_map[oid] = int(value['data'])
else:
raise ValueError("Invalid value type")

Expand All @@ -486,7 +485,7 @@ def init_namespace_dbs():
db_conn = []
SonicDBConfig.load_sonic_global_db_config()
for namespace in SonicDBConfig.get_ns_list():
db = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
db = SonicV2Connector(use_unix_socket_path=True, namespace=namespace, decode_responses=True)
db_conn.append(db)

Namespace.connect_namespace_dbs(db_conn)
Expand Down
46 changes: 23 additions & 23 deletions src/sonic_ax_impl/mibs/ieee802_1ab.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def poll_lldp_entry_updates(pubsub):
return ret

try:
interface = msg["channel"].split(b":")[-1].decode()
interface = msg["channel"].split(":")[-1]
data = msg['data']
except (KeyError, AttributeError) as e:
logger.error("Invalid msg when polling for lldp updates: {}\n"
Expand Down Expand Up @@ -112,8 +112,8 @@ def reinit_data(self):
# establish connection to application database.
Namespace.connect_all_dbs(self.db_conn, mibs.APPL_DB)
self.loc_chassis_data = Namespace.dbs_get_all(self.db_conn, mibs.APPL_DB, mibs.LOC_CHASSIS_TABLE)
self.loc_chassis_data[b'lldp_loc_sys_cap_supported'] = parse_sys_capability(self.loc_chassis_data[b'lldp_loc_sys_cap_supported'])
self.loc_chassis_data[b'lldp_loc_sys_cap_enabled'] = parse_sys_capability(self.loc_chassis_data[b'lldp_loc_sys_cap_enabled'])
self.loc_chassis_data['lldp_loc_sys_cap_supported'] = parse_sys_capability(self.loc_chassis_data['lldp_loc_sys_cap_supported'])
self.loc_chassis_data['lldp_loc_sys_cap_enabled'] = parse_sys_capability(self.loc_chassis_data['lldp_loc_sys_cap_enabled'])
def update_data(self):
"""
Avoid NotImplementedError
Expand All @@ -123,7 +123,7 @@ def update_data(self):

def table_lookup(self, table_name):
try:
_table_name = bytes(getattr(table_name, 'name', table_name), 'utf-8')
_table_name = getattr(table_name, 'name', table_name)
return self.loc_chassis_data[_table_name]
except KeyError as e:
logger.warning(" 0 - b'LOC_CHASSIS' missing attribute '{}'.".format(e))
Expand Down Expand Up @@ -229,13 +229,13 @@ def _update_per_namespace_data(self, pubsub):
if not data:
break

if b"set" in data:
self.update_interface_data(interface.encode())
if "set" in data:
self.update_interface_data(interface)

def update_data(self):
for i in range(len(self.db_conn)):
if not self.pubsub[i]:
pattern = mibs.lldp_entry_table(b'*')
pattern = mibs.lldp_entry_table('*')
self.pubsub[i] = mibs.get_redis_pubsub(self.db_conn[i], self.db_conn[i].APPL_DB, pattern)
self._update_per_namespace_data(self.pubsub[i])

Expand Down Expand Up @@ -270,7 +270,7 @@ def port_table_lookup(self, sub_id, table_name):
# no data for this interface
return None
counters = self.loc_port_data[if_name]
_table_name = bytes(getattr(table_name, 'name', table_name), 'utf-8')
_table_name = getattr(table_name, 'name', table_name)

return counters.get(_table_name, '')

Expand Down Expand Up @@ -302,13 +302,13 @@ def reinit_data(self):

# establish connection to application database.
self.db_conn.connect(mibs.APPL_DB)
mgmt_ip_bytes = self.db_conn.get(mibs.APPL_DB, mibs.LOC_CHASSIS_TABLE, b'lldp_loc_man_addr')
mgmt_ip_bytes = self.db_conn.get(mibs.APPL_DB, mibs.LOC_CHASSIS_TABLE, 'lldp_loc_man_addr')

if not mgmt_ip_bytes:
logger.warning("Missing lldp_loc_man_addr from APPL DB")
return

self.mgmt_ip_str = mgmt_ip_bytes.decode()
self.mgmt_ip_str = mgmt_ip_bytes
logger.debug("Got mgmt ip from db : {}".format(self.mgmt_ip_str))
try:
addr_subtype_sub_oid = 4
Expand Down Expand Up @@ -435,12 +435,12 @@ def update_data(self):
# To avoid repeating the data of same interface index with different remote
# time mark, remote time mark is made as 0 in the OID indexing.
time_mark = 0
remote_index = int(lldp_kvs[b'lldp_rem_index'])
remote_index = int(lldp_kvs['lldp_rem_index'])
self.if_range.append((time_mark,
if_oid,
remote_index))
lldp_kvs[b'lldp_rem_sys_cap_supported'] = parse_sys_capability(lldp_kvs[b'lldp_rem_sys_cap_supported'])
lldp_kvs[b'lldp_rem_sys_cap_enabled'] = parse_sys_capability(lldp_kvs[b'lldp_rem_sys_cap_enabled'])
lldp_kvs['lldp_rem_sys_cap_supported'] = parse_sys_capability(lldp_kvs['lldp_rem_sys_cap_supported'])
lldp_kvs['lldp_rem_sys_cap_enabled'] = parse_sys_capability(lldp_kvs['lldp_rem_sys_cap_enabled'])
self.lldp_counters.update({if_name: lldp_kvs})
except (KeyError, AttributeError) as e:
logger.warning("Exception when updating lldpRemTable: {}".format(e))
Expand All @@ -467,7 +467,7 @@ def lldp_table_lookup(self, sub_id, table_name):
# no LLDP data for this interface
return None
counters = self.lldp_counters[if_name]
_table_name = bytes(getattr(table_name, 'name', table_name), 'utf-8')
_table_name = getattr(table_name, 'name', table_name)
try:
return counters[_table_name]
except KeyError as e:
Expand Down Expand Up @@ -500,17 +500,17 @@ def __init__(self):

def update_rem_if_mgmt(self, if_oid, if_name):
lldp_kvs = Namespace.dbs_get_all(self.db_conn, mibs.APPL_DB, mibs.lldp_entry_table(if_name))
if not lldp_kvs or b'lldp_rem_man_addr' not in lldp_kvs:
if not lldp_kvs or 'lldp_rem_man_addr' not in lldp_kvs:
# this interfaces doesn't have remote lldp data, or the peer doesn't advertise his mgmt address
return
try:
mgmt_ip_str = lldp_kvs[b'lldp_rem_man_addr'].decode()
mgmt_ip_str = lldp_kvs['lldp_rem_man_addr']
mgmt_ip_str = mgmt_ip_str.strip()
if len(mgmt_ip_str) == 0:
# the peer advertise an emtpy mgmt address
return
time_mark = int(lldp_kvs[b'lldp_rem_time_mark'])
remote_index = int(lldp_kvs[b'lldp_rem_index'])
time_mark = int(lldp_kvs['lldp_rem_time_mark'])
remote_index = int(lldp_kvs['lldp_rem_index'])
subtype = self.get_subtype(mgmt_ip_str)
ip_hex = self.get_ip_hex(mgmt_ip_str, subtype)
if subtype == ManAddrConst.man_addr_subtype_ipv4:
Expand Down Expand Up @@ -546,17 +546,17 @@ def _update_per_namespace_data(self, pubsub):
if not data:
break

if b"set" in data:
self.update_rem_if_mgmt(if_index, interface.encode())
elif b"del" in data:
if "set" in data:
self.update_rem_if_mgmt(if_index, interface)
elif "del" in data:
# some remote data about that neighbor is gone, del it and try to query again
self.if_range = [sub_oid for sub_oid in self.if_range if sub_oid[0] != if_index]
self.update_rem_if_mgmt(if_index, interface.encode())
self.update_rem_if_mgmt(if_index, interface)

def update_data(self):
for i in range(len(self.db_conn)):
if not self.pubsub[i]:
pattern = mibs.lldp_entry_table(b'*')
pattern = mibs.lldp_entry_table('*')
self.pubsub[i] = mibs.get_redis_pubsub(self.db_conn[i], self.db_conn[i].APPL_DB, pattern)
self._update_per_namespace_data(self.pubsub[i])

Expand Down
Loading

0 comments on commit 57e54d9

Please sign in to comment.