Skip to content

Commit

Permalink
[dualtor-aa][nic_simulator] Improve the upstream flow (#11459)
Browse files Browse the repository at this point in the history
From the ovs doc, if mod-flow is used without --strict, priority is not
used in matching.
This will cause problem for downstream set_drop when
duplicate_nic_upstream is disabled.

For example:

When set_drop is applied to upstream_nic_flow(#1), mod-flow will match both
flow #2 and flow  #3 as priority is not used in flow match.

So let's enforce strict match for mod-flow.

Signed-off-by: Longxiang Lyu <[email protected]>
  • Loading branch information
lolyu authored and mssonicbld committed Feb 15, 2024
1 parent 8602047 commit 11c9af3
Showing 1 changed file with 74 additions and 15 deletions.
89 changes: 74 additions & 15 deletions ansible/dualtor/nic_simulator/nic_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class OVSCommand(object):
OVS_VSCTL_LIST_PORTS_CMD = "ovs-vsctl list-ports {bridge_name}"
OVS_OFCTL_DEL_FLOWS_CMD = "ovs-ofctl del-flows {bridge_name}"
OVS_OFCTL_ADD_FLOWS_CMD = "ovs-ofctl add-flow {bridge_name} {flow}"
OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl mod-flows {bridge_name} {flow}"
OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl --strict mod-flows {bridge_name} {flow}"
OVS_OFCTL_DEL_GROUPS_CMD = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}"
OVS_OFCTL_ADD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}"
OVS_OFCTL_MOD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}"
Expand Down Expand Up @@ -251,19 +251,21 @@ def set_drop(self, recover=False):
class OVSUpstreamFlow(OVSFlow):
"""Object to represent an OVS upstream flow to output to both ToRs."""

__slots__ = ("drop_output",)
__slots__ = ("drop_output", "enable_output_ports")

def __init__(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None):
def __init__(self, in_port, packet_filter=None, output_ports=[],
group=None, priority=None, enable_output_ports=None):
super(OVSUpstreamFlow, self).__init__(
in_port, packet_filter, output_ports, group, priority)
self.drop_output = [False, False]
self.enable_output_ports = enable_output_ports if enable_output_ports else [True, True]

def to_string(self):
flow_parts = [self._str_prefix]
has_output = False
if self.output_ports:
output = ["output:%s" % port for (portid, port) in enumerate(self.output_ports)
if not self.drop_output[portid]]
if (self.get_port_enable(portid) and (not self.get_drop(portid)))]
has_output = bool(output)
if has_output:
flow_parts.append("actions=%s" % ",".join(output))
Expand All @@ -272,6 +274,9 @@ def to_string(self):
flow_parts.append("actions=drop")
return ",".join(flow_parts)

def get_port_enable(self, portid):
return self.enable_output_ports[portid]

def get_drop(self, portid):
return self.drop_output[portid]

Expand All @@ -283,7 +288,8 @@ def set_drop(self, portid=None, recover=False):
else:
self.drop_output[portid] = is_drop

self.reset()
if self.get_port_enable(portid):
self.reset()


class ForwardingState(object):
Expand Down Expand Up @@ -418,6 +424,8 @@ class OVSBridge(object):
"downstream_upper_tor_flow",
"downstream_lower_tor_flow",
"upstream_nic_flow",
"upstream_upper_tor_nic_flow",
"upstream_lower_tor_nic_flow",
"upstream_loopback2_flow",
"upstream_upper_tor_loopback3_flow",
"upstream_lower_tor_loopback3_flow",
Expand All @@ -426,7 +434,7 @@ class OVSBridge(object):
"flap_counter"
)

def __init__(self, bridge_name, loopback_ips):
def __init__(self, bridge_name, loopback_ips, duplicate_nic_upstream=False):
self.bridge_name = bridge_name
self.loopback2_ip = loopback_ips[0]
self.upper_tor_loopback3_ip = loopback_ips[1]
Expand All @@ -442,7 +450,7 @@ def __init__(self, bridge_name, loopback_ips):
self.flows = []
self.groups = []
self._init_ports()
self._init_flows()
self._init_flows(duplicate_nic_upstream)
self.states_getter = {
1: self.upstream_ecmp_flow.get_upper_tor_forwarding_state,
0: self.upstream_ecmp_flow.get_lower_tor_forwarding_state
Expand Down Expand Up @@ -489,18 +497,38 @@ def _init_ports(self):
self.lower_tor_port
)

def _init_flows(self):
def _init_flows(self, duplicate_nic_upstream=False):
"""Initialize OVS flows for the bridge."""
logging.info("Init flows for bridge %s", self.bridge_name)
self._del_flows()
self._del_groups()
# downstream flows
self.downstream_upper_tor_flow = self._add_flow(self.upper_tor_port,
output_ports=[self.ptf_port, self.server_nic], priority=10)
output_ports=[self.ptf_port, self.server_nic], priority=11)
self.downstream_lower_tor_flow = self._add_flow(self.lower_tor_port,
output_ports=[self.ptf_port, self.server_nic], priority=10)
output_ports=[self.ptf_port, self.server_nic], priority=11)

# upstream flows
if not duplicate_nic_upstream:
# NOTE: add two flows to direct gRPC traffic to its correct destination
# upstream packet to the upper ToR loopback3 from server NiC should be forwarded to the upper ToR
self.upstream_upper_tor_nic_flow = self._add_flow(
self.server_nic,
packet_filter="tcp,ip_dst=%s" % self.upper_tor_loopback3_ip,
output_ports=[self.lower_tor_port, self.upper_tor_port],
priority=10,
upstream=True,
enable_output_ports=[False, True]
)
# upstream packet to the lower ToR loopback3 from server NiC should be forwarded to the lower ToR
self.upstream_lower_tor_nic_flow = self._add_flow(
self.server_nic,
packet_filter="tcp,ip_dst=%s" % self.lower_tor_loopback3_ip,
output_ports=[self.lower_tor_port, self.upper_tor_port],
priority=10,
upstream=True,
enable_output_ports=[True, False]
)
# upstream packet from server NiC should be directed to both ToRs
self.upstream_nic_flow = self._add_flow(
self.server_nic,
Expand Down Expand Up @@ -572,10 +600,11 @@ def _del_groups(self):
self.upstream_ecmp_group = None
self.groups.clear()

def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None, upstream=False):
def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None,
upstream=False, enable_output_ports=None):
if upstream:
flow = OVSUpstreamFlow(in_port, packet_filter=packet_filter, output_ports=output_ports,
group=group, priority=priority)
group=group, priority=priority, enable_output_ports=enable_output_ports)
else:
flow = OVSFlow(in_port, packet_filter=packet_filter, output_ports=output_ports,
group=group, priority=priority)
Expand Down Expand Up @@ -639,6 +668,18 @@ def set_drop(self, portids, directions, recover):

# recover upstream
# recover upstream traffic from server NiC
if self.upstream_upper_tor_nic_flow.get_port_enable(portid):
if self.upstream_upper_tor_nic_flow.get_drop(portid):
self.upstream_upper_tor_nic_flow.set_drop(
portid=portid, recover=recover)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_upper_tor_nic_flow)
if self.upstream_lower_tor_nic_flow.get_port_enable(portid):
if self.upstream_lower_tor_nic_flow.get_drop(portid):
self.upstream_lower_tor_nic_flow.set_drop(
portid=portid, recover=recover)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_lower_tor_nic_flow)
if self.upstream_nic_flow.get_drop(portid):
self.upstream_nic_flow.set_drop(
portid=portid, recover=recover)
Expand Down Expand Up @@ -690,6 +731,16 @@ def set_drop(self, portids, directions, recover):
elif direction == 1:
# upstream
# drop upstream traffic from server NiC
if self.upstream_upper_tor_nic_flow.get_port_enable(portid):
if not self.upstream_upper_tor_nic_flow.get_drop(portid):
self.upstream_upper_tor_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_upper_tor_nic_flow)
if self.upstream_lower_tor_nic_flow.get_port_enable(portid):
if not self.upstream_lower_tor_nic_flow.get_drop(portid):
self.upstream_lower_tor_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
self.bridge_name, self.upstream_lower_tor_nic_flow)
if not self.upstream_nic_flow.get_drop(portid):
self.upstream_nic_flow.set_drop(portid)
OVSCommand.ovs_ofctl_mod_flow(
Expand Down Expand Up @@ -1128,7 +1179,7 @@ def start(self):
class NiCSimulator(nic_simulator_grpc_service_pb2_grpc.DualToRActiveServicer):
"""NiC simulator class, define all the gRPC calls."""

def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips):
def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips, duplicate_nic_upstream=False):
self.vm_set = vm_set
self.server_nics = self._find_all_server_nics()
self.server_nic_addresses = {
Expand All @@ -1144,7 +1195,8 @@ def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips):
if server_nic in self.server_nic_addresses:
server_nic_addr = self.server_nic_addresses[server_nic]
if server_nic_addr is not None:
self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips)
self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips, duplicate_nic_upstream)

logging.info("Starting NiC simulator to manipulate OVS bridges: %s",
json.dumps(list(self.ovs_bridges.keys()), indent=4))

Expand Down Expand Up @@ -1218,6 +1270,13 @@ def parse_args():
help="the Loopback IPs to duplicate to both ToRs: <Loopback2>,<upper ToR Loopback3>,<lower ToR Loopback3>",
dest="loopback_ips"
)
parser.add_argument(
"-n",
"--duplicate_nic_upstream",
default=False,
action="store_true",
help="Duplicate NIC upstream traffic to both ToRs (default: False)",
)
args = parser.parse_args()
return args

Expand Down Expand Up @@ -1270,7 +1329,7 @@ def main():
loopback_ips = args.loopback_ips.split(",")
if len(loopback_ips) != 3:
raise ValueError("Invalid loopback ips: {loopback_ips}".format(loopback_ips=loopback_ips))
nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips)
nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips, args.duplicate_nic_upstream)
nic_simulator.start_nic_servers()
try:
nic_simulator.start_mgmt_server()
Expand Down

0 comments on commit 11c9af3

Please sign in to comment.