diff --git a/ansible/dualtor/nic_simulator/nic_simulator.py b/ansible/dualtor/nic_simulator/nic_simulator.py index e60ab1ed48f..21fc1f7d2dd 100644 --- a/ansible/dualtor/nic_simulator/nic_simulator.py +++ b/ansible/dualtor/nic_simulator/nic_simulator.py @@ -100,7 +100,7 @@ class OVSCommand(object): OVS_VSCTL_LIST_PORTS_CMD = "ovs-vsctl list-ports {bridge_name}" OVS_OFCTL_DEL_FLOWS_CMD = "ovs-ofctl del-flows {bridge_name}" OVS_OFCTL_ADD_FLOWS_CMD = "ovs-ofctl add-flow {bridge_name} {flow}" - OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl mod-flows {bridge_name} {flow}" + OVS_OFCTL_MOD_FLOWS_CMD = "ovs-ofctl --strict mod-flows {bridge_name} {flow}" OVS_OFCTL_DEL_GROUPS_CMD = "ovs-ofctl -O OpenFlow13 del-groups {bridge_name}" OVS_OFCTL_ADD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 add-group {bridge_name} {group}" OVS_OFCTL_MOD_GROUP_CMD = "ovs-ofctl -O OpenFlow13 mod-group {bridge_name} {group}" @@ -251,19 +251,21 @@ def set_drop(self, recover=False): class OVSUpstreamFlow(OVSFlow): """Object to represent an OVS upstream flow to output to both ToRs.""" - __slots__ = ("drop_output",) + __slots__ = ("drop_output", "enable_output_ports") - def __init__(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None): + def __init__(self, in_port, packet_filter=None, output_ports=[], + group=None, priority=None, enable_output_ports=None): super(OVSUpstreamFlow, self).__init__( in_port, packet_filter, output_ports, group, priority) self.drop_output = [False, False] + self.enable_output_ports = enable_output_ports if enable_output_ports else [True, True] def to_string(self): flow_parts = [self._str_prefix] has_output = False if self.output_ports: output = ["output:%s" % port for (portid, port) in enumerate(self.output_ports) - if not self.drop_output[portid]] + if (self.get_port_enable(portid) and (not self.get_drop(portid)))] has_output = bool(output) if has_output: flow_parts.append("actions=%s" % ",".join(output)) @@ -272,6 +274,9 @@ def to_string(self): flow_parts.append("actions=drop") return ",".join(flow_parts) + def get_port_enable(self, portid): + return self.enable_output_ports[portid] + def get_drop(self, portid): return self.drop_output[portid] @@ -283,7 +288,8 @@ def set_drop(self, portid=None, recover=False): else: self.drop_output[portid] = is_drop - self.reset() + if self.get_port_enable(portid): + self.reset() class ForwardingState(object): @@ -418,6 +424,8 @@ class OVSBridge(object): "downstream_upper_tor_flow", "downstream_lower_tor_flow", "upstream_nic_flow", + "upstream_upper_tor_nic_flow", + "upstream_lower_tor_nic_flow", "upstream_loopback2_flow", "upstream_upper_tor_loopback3_flow", "upstream_lower_tor_loopback3_flow", @@ -426,7 +434,7 @@ class OVSBridge(object): "flap_counter" ) - def __init__(self, bridge_name, loopback_ips): + def __init__(self, bridge_name, loopback_ips, duplicate_nic_upstream=False): self.bridge_name = bridge_name self.loopback2_ip = loopback_ips[0] self.upper_tor_loopback3_ip = loopback_ips[1] @@ -442,7 +450,7 @@ def __init__(self, bridge_name, loopback_ips): self.flows = [] self.groups = [] self._init_ports() - self._init_flows() + self._init_flows(duplicate_nic_upstream) self.states_getter = { 1: self.upstream_ecmp_flow.get_upper_tor_forwarding_state, 0: self.upstream_ecmp_flow.get_lower_tor_forwarding_state @@ -489,18 +497,38 @@ def _init_ports(self): self.lower_tor_port ) - def _init_flows(self): + def _init_flows(self, duplicate_nic_upstream=False): """Initialize OVS flows for the bridge.""" logging.info("Init flows for bridge %s", self.bridge_name) self._del_flows() self._del_groups() # downstream flows self.downstream_upper_tor_flow = self._add_flow(self.upper_tor_port, - output_ports=[self.ptf_port, self.server_nic], priority=10) + output_ports=[self.ptf_port, self.server_nic], priority=11) self.downstream_lower_tor_flow = self._add_flow(self.lower_tor_port, - output_ports=[self.ptf_port, self.server_nic], priority=10) + output_ports=[self.ptf_port, self.server_nic], priority=11) # upstream flows + if not duplicate_nic_upstream: + # NOTE: add two flows to direct gRPC traffic to its correct destination + # upstream packet to the upper ToR loopback3 from server NiC should be forwarded to the upper ToR + self.upstream_upper_tor_nic_flow = self._add_flow( + self.server_nic, + packet_filter="tcp,ip_dst=%s" % self.upper_tor_loopback3_ip, + output_ports=[self.lower_tor_port, self.upper_tor_port], + priority=10, + upstream=True, + enable_output_ports=[False, True] + ) + # upstream packet to the lower ToR loopback3 from server NiC should be forwarded to the lower ToR + self.upstream_lower_tor_nic_flow = self._add_flow( + self.server_nic, + packet_filter="tcp,ip_dst=%s" % self.lower_tor_loopback3_ip, + output_ports=[self.lower_tor_port, self.upper_tor_port], + priority=10, + upstream=True, + enable_output_ports=[True, False] + ) # upstream packet from server NiC should be directed to both ToRs self.upstream_nic_flow = self._add_flow( self.server_nic, @@ -572,10 +600,11 @@ def _del_groups(self): self.upstream_ecmp_group = None self.groups.clear() - def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None, upstream=False): + def _add_flow(self, in_port, packet_filter=None, output_ports=[], group=None, priority=None, + upstream=False, enable_output_ports=None): if upstream: flow = OVSUpstreamFlow(in_port, packet_filter=packet_filter, output_ports=output_ports, - group=group, priority=priority) + group=group, priority=priority, enable_output_ports=enable_output_ports) else: flow = OVSFlow(in_port, packet_filter=packet_filter, output_ports=output_ports, group=group, priority=priority) @@ -639,6 +668,18 @@ def set_drop(self, portids, directions, recover): # recover upstream # recover upstream traffic from server NiC + if self.upstream_upper_tor_nic_flow.get_port_enable(portid): + if self.upstream_upper_tor_nic_flow.get_drop(portid): + self.upstream_upper_tor_nic_flow.set_drop( + portid=portid, recover=recover) + OVSCommand.ovs_ofctl_mod_flow( + self.bridge_name, self.upstream_upper_tor_nic_flow) + if self.upstream_lower_tor_nic_flow.get_port_enable(portid): + if self.upstream_lower_tor_nic_flow.get_drop(portid): + self.upstream_lower_tor_nic_flow.set_drop( + portid=portid, recover=recover) + OVSCommand.ovs_ofctl_mod_flow( + self.bridge_name, self.upstream_lower_tor_nic_flow) if self.upstream_nic_flow.get_drop(portid): self.upstream_nic_flow.set_drop( portid=portid, recover=recover) @@ -690,6 +731,16 @@ def set_drop(self, portids, directions, recover): elif direction == 1: # upstream # drop upstream traffic from server NiC + if self.upstream_upper_tor_nic_flow.get_port_enable(portid): + if not self.upstream_upper_tor_nic_flow.get_drop(portid): + self.upstream_upper_tor_nic_flow.set_drop(portid) + OVSCommand.ovs_ofctl_mod_flow( + self.bridge_name, self.upstream_upper_tor_nic_flow) + if self.upstream_lower_tor_nic_flow.get_port_enable(portid): + if not self.upstream_lower_tor_nic_flow.get_drop(portid): + self.upstream_lower_tor_nic_flow.set_drop(portid) + OVSCommand.ovs_ofctl_mod_flow( + self.bridge_name, self.upstream_lower_tor_nic_flow) if not self.upstream_nic_flow.get_drop(portid): self.upstream_nic_flow.set_drop(portid) OVSCommand.ovs_ofctl_mod_flow( @@ -1128,7 +1179,7 @@ def start(self): class NiCSimulator(nic_simulator_grpc_service_pb2_grpc.DualToRActiveServicer): """NiC simulator class, define all the gRPC calls.""" - def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips): + def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips, duplicate_nic_upstream=False): self.vm_set = vm_set self.server_nics = self._find_all_server_nics() self.server_nic_addresses = { @@ -1144,7 +1195,8 @@ def __init__(self, vm_set, mgmt_port, binding_port, loopback_ips): if server_nic in self.server_nic_addresses: server_nic_addr = self.server_nic_addresses[server_nic] if server_nic_addr is not None: - self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips) + self.ovs_bridges[server_nic_addr] = OVSBridge(bridge_name, loopback_ips, duplicate_nic_upstream) + logging.info("Starting NiC simulator to manipulate OVS bridges: %s", json.dumps(list(self.ovs_bridges.keys()), indent=4)) @@ -1218,6 +1270,13 @@ def parse_args(): help="the Loopback IPs to duplicate to both ToRs: ,,", dest="loopback_ips" ) + parser.add_argument( + "-n", + "--duplicate_nic_upstream", + default=False, + action="store_true", + help="Duplicate NIC upstream traffic to both ToRs (default: False)", + ) args = parser.parse_args() return args @@ -1270,7 +1329,7 @@ def main(): loopback_ips = args.loopback_ips.split(",") if len(loopback_ips) != 3: raise ValueError("Invalid loopback ips: {loopback_ips}".format(loopback_ips=loopback_ips)) - nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips) + nic_simulator = NiCSimulator(args.vm_set, "mgmt", args.port, loopback_ips, args.duplicate_nic_upstream) nic_simulator.start_nic_servers() try: nic_simulator.start_mgmt_server()