-
Notifications
You must be signed in to change notification settings - Fork 740
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add server traffic utility to check the traffic between the selected ToR and the server. Signed-off-by: Longxiang Lyu <[email protected]>
- Loading branch information
Showing
4 changed files
with
140 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
from tests.common.devices.base import AnsibleHostBase | ||
|
||
|
||
class VMHost(AnsibleHostBase): | ||
""" | ||
@summary: Class for VM server | ||
For running ansible module on VM server | ||
""" | ||
|
||
def __init__(self, ansible_adhoc, hostname): | ||
AnsibleHostBase.__init__(self, ansible_adhoc, hostname) | ||
|
||
@property | ||
def external_port(self): | ||
if not hasattr(self, "_external_port"): | ||
vm = self.host.options["variable_manager"] | ||
im = self.host.options["inventory_manager"] | ||
hostvars = vm.get_vars(host=im.get_host(self.hostname), include_delegate_to=False) | ||
setattr(self, "_external_port", hostvars["external_port"]) | ||
return getattr(self, "_external_port") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
"""Utils to verify traffic between ToR and server.""" | ||
import contextlib | ||
import logging | ||
import tempfile | ||
import sys | ||
import time | ||
|
||
from io import BytesIO | ||
from ptf.dataplane import match_exp_pkt | ||
from scapy.all import sniff | ||
from scapy.packet import ls | ||
|
||
|
||
@contextlib.contextmanager | ||
def dump_intf_packets(ansible_host, iface, pcap_save_path, dumped_packets, pcap_filter=None): | ||
"""Dump packets of the interface and save to a file.""" | ||
start_pcap = "tcpdump --immediate-mode -i %s -w %s" % (iface, pcap_save_path) | ||
if pcap_filter: | ||
start_pcap += (" " + pcap_filter) | ||
start_pcap = "nohup %s > /dev/null 2>&1 & echo $!" % start_pcap | ||
pid = ansible_host.shell(start_pcap)["stdout"] | ||
# sleep to let tcpdump starts to capture | ||
time.sleep(1) | ||
try: | ||
yield | ||
finally: | ||
ansible_host.shell("kill -s 2 %s" % pid) | ||
with tempfile.NamedTemporaryFile() as temp_pcap: | ||
ansible_host.fetch(src=pcap_save_path, dest=temp_pcap.name, flat=True) | ||
packets = sniff(offline=temp_pcap.name) | ||
dumped_packets.extend(packets) | ||
|
||
|
||
class ServerTrafficMonitor(object): | ||
"""Monit traffic between ToR and server.""" | ||
|
||
VLAN_INTERFACE_TEMPLATE = "{external_port}.{vlan_id}" | ||
|
||
def __init__(self, duthost, vmhost, dut_iface, conn_graph_facts, exp_pkt, existing=True): | ||
self.duthost = duthost | ||
self.dut_iface = dut_iface | ||
self.exp_pkt = exp_pkt | ||
self.vmhost = vmhost | ||
self.conn_graph_facts = conn_graph_facts | ||
self.captured_packets = [] | ||
self.matched_packets = [] | ||
self.vmhost_iface = self._find_vmhost_vlan_interface() | ||
self.dump_utility = dump_intf_packets( | ||
vmhost, | ||
self.vmhost_iface, | ||
tempfile.NamedTemporaryFile().name, | ||
self.captured_packets | ||
) | ||
self.existing = existing | ||
|
||
@staticmethod | ||
def _list_layer_str(packet): | ||
"""Return list layer output string.""" | ||
_stdout, sys.stdout = sys.stdout, BytesIO() | ||
try: | ||
ls(packet) | ||
return sys.stdout.getvalue() | ||
finally: | ||
sys.stdout = _stdout | ||
|
||
def _find_vmhost_vlan_interface(self): | ||
"""Find the vmhost vlan interface that will be sniffed on.""" | ||
device_port_vlans = self.conn_graph_facts["device_port_vlans"][self.duthost.hostname] | ||
vlan_id = device_port_vlans[self.dut_iface]["vlanlist"][0] | ||
return self.VLAN_INTERFACE_TEMPLATE.format(external_port=self.vmhost.external_port, vlan_id=vlan_id) | ||
|
||
def __enter__(self): | ||
self.captured_packets[:] = [] | ||
self.matched_packets[:] = [] | ||
self.dump_utility.__enter__() | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
self.dump_utility.__exit__(exc_type, exc_value, traceback) | ||
logging.info("the expected packet:\n%s", str(self.exp_pkt)) | ||
self.matched_packets = [p for p in self.captured_packets if match_exp_pkt(self.exp_pkt, p)] | ||
logging.info("received %d matched packets", len(self.matched_packets)) | ||
if self.matched_packets: | ||
logging.info( | ||
"display the most recent matched captured packet:\n%s", | ||
self._list_layer_str(self.matched_packets[-1]) | ||
) | ||
if self.existing and not self.matched_packets: | ||
raise ValueError("Failed to find expected packet.") | ||
if not self.existing and self.matched_packets: | ||
raise ValueError("Found expected packet.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters