forked from kernelkit/infix
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cli: rewrite interface pretty printer in phyton3
The old version (bash) where slow and cluttered. This version is much faster and easier to read / maintain. Signed-off-by: Richard Alpe <[email protected]>
- Loading branch information
Showing
3 changed files
with
209 additions
and
129 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
#!/usr/bin/env python3 | ||
import json | ||
import sys | ||
import argparse | ||
|
||
parser = argparse.ArgumentParser(description="JSON CLI Pretty Printer") | ||
parser.add_argument("module", help="IETF Module") | ||
parser.add_argument("-n", "--name", help="Focus on specific name") | ||
args = parser.parse_args() | ||
|
||
class Pad: | ||
iface = 15 + 2 # + 2 is ASCII tree for nesting | ||
proto = 20 | ||
state = 15 | ||
data = 40 | ||
_data = iface + proto + state | ||
|
||
class Decore(): | ||
@staticmethod | ||
def decorate(sgr, txt, restore="0"): | ||
return f"\033[{sgr}m{txt}\033[{restore}m" | ||
|
||
@staticmethod | ||
def invert(txt): | ||
return Decore.decorate("7", txt) | ||
|
||
@staticmethod | ||
def red(txt): | ||
return Decore.decorate("31", txt, "39") | ||
|
||
@staticmethod | ||
def green(txt): | ||
return Decore.decorate("32", txt, "39") | ||
|
||
class Iface: | ||
def __init__(self, data): | ||
self.data = data | ||
self.name = data.get('name', '') | ||
self.index = data.get('if-index', None) | ||
self.oper_status = data.get('oper-status', None) | ||
self.phys_address = data.get('phys-address', None) | ||
|
||
self.parent = data.get('ietf-if-extensions:parent-interface', None) | ||
|
||
if self.data.get('ietf-ip:ipv4'): | ||
self.mtu = self.data.get('ietf-ip:ipv4').get('mtu', None) | ||
self.ipv4_addr = self.data.get('ietf-ip:ipv4').get('address', None) | ||
else: | ||
self.mtu = None | ||
self.ipv4_addr = [] | ||
|
||
if self.data.get('infix-interfaces:bridge-port'): | ||
self.bridge = self.data.get('infix-interfaces:bridge-port').get('bridge', None) | ||
else: | ||
self.bridge = None | ||
|
||
def is_vlan(self): | ||
return self.data['type'] == "iana-if-type:l2vlan" | ||
|
||
def is_etherent(self): | ||
return self.data['type'] == "iana-if-type:ethernetCsmacd" | ||
|
||
def is_bridge(self): | ||
return self.data['type'] == "iana-if-type:bridge" | ||
|
||
def pr_name(self, pipe=""): | ||
print(f"{pipe}{self.name:<{Pad.iface - len(pipe)}}", end="") | ||
|
||
|
||
def pr_proto_ipv4(self, pipe=''): | ||
addr = self.data.get("ietf-ip:ipv4") | ||
if not addr: | ||
return | ||
|
||
addresses = self.data.get("ietf-ip:ipv4", {}).get("address", []) | ||
for addr in addresses: | ||
row = f"{pipe:<{Pad.iface}}" | ||
row += f"{'ipv4':<{Pad.proto}}" | ||
row += f"{'':<{Pad.state}}{addr['ip']}/{addr['prefix-length']}" | ||
print(row) | ||
|
||
def pr_proto_eth(self): | ||
row = f"{'ethernet':<{Pad.proto}}" | ||
dec = Decore.green if self.data['oper-status'] == "up" else Decore.red | ||
row += dec(f"{self.data['oper-status'].upper():<{Pad.state}}") | ||
row += f"{self.data['phys-address']:<{Pad.data}}" | ||
print(row) | ||
|
||
def pr_bridge(self, _ifaces): | ||
self.pr_name(pipe="") | ||
self.pr_proto_eth() | ||
|
||
|
||
lowers = [] | ||
for _iface in [Iface(data) for data in _ifaces]: | ||
if _iface.bridge and _iface.bridge == self.name: | ||
lowers.append(_iface) | ||
|
||
if lowers: | ||
self.pr_proto_ipv4(pipe='│') | ||
else: | ||
self.pr_proto_ipv4() | ||
|
||
for i, lower in enumerate(lowers): | ||
pipe = '└ ' if (i == len(lowers) -1) else '├ ' | ||
lower.pr_name(pipe) | ||
lower.pr_proto_eth() | ||
|
||
def pr_vlan(self, _ifaces): | ||
self.pr_name(pipe="") | ||
self.pr_proto_eth() | ||
|
||
if self.parent: | ||
self.pr_proto_ipv4(pipe='│') | ||
else: | ||
self.pr_proto_ipv4() | ||
return | ||
|
||
parent = find_iface(_ifaces, self.parent) | ||
if not parent: | ||
print(f"Error, didn't find parent interface for vlan {self.name}") | ||
sys.exit(1) | ||
parent.pr_name(pipe='└ ') | ||
parent.pr_proto_eth() | ||
|
||
def pr_iface(self): | ||
print(f"{'name':<{20}}: {self.name}") | ||
if self.index: | ||
print(f"{'index':<{20}}: {self.index}") | ||
if self.mtu: | ||
print(f"{'mtu':<{20}}: {self.mtu}") | ||
if self.oper_status: | ||
print(f"{'operational status':<{20}}: {self.oper_status}") | ||
if self.phys_address: | ||
print(f"{'physical address':<{20}}: {self.phys_address}") | ||
|
||
if self.ipv4_addr: | ||
first = True | ||
print("") | ||
for addr in self.ipv4_addr: | ||
key = 'ipv4 addresses' if first else '' | ||
print(f"{key:<{20}}: {addr['ip']}/{addr['prefix-length']}") | ||
first = False | ||
|
||
def find_iface(_ifaces, name): | ||
for _iface in [Iface(data) for data in _ifaces]: | ||
if _iface.name == name: | ||
return _iface | ||
|
||
return False | ||
|
||
|
||
def pr_interface_list(json): | ||
hdr = (f"{'INTERFACE':<{Pad.iface}}" | ||
f"{'PROTOCOL':<{Pad.proto}}" | ||
f"{'STATE':<{Pad.state}}" | ||
f"{'DATA':<{Pad.data}}") | ||
|
||
print(Decore.invert(hdr)) | ||
|
||
ifaces = sorted(json["ietf-interfaces:interfaces"]["interface"], key=lambda x: x['name']) | ||
|
||
for iface in [Iface(data) for data in ifaces]: | ||
if iface.is_bridge(): | ||
iface.pr_bridge(ifaces) | ||
continue | ||
|
||
if iface.is_vlan(): | ||
iface.pr_vlan(ifaces) | ||
continue | ||
|
||
# These interfaces are printed by there parent, such as bridge | ||
if iface.parent: | ||
continue | ||
if iface.bridge: | ||
continue | ||
|
||
iface.pr_name() | ||
iface.pr_proto_eth() | ||
iface.pr_proto_ipv4() | ||
|
||
def ietf_interfaces(json, name): | ||
if not json or not json.get("ietf-interfaces:interfaces"): | ||
print(f"Error, top level \"ietf-interfaces:interfaces\" missing") | ||
sys.exit(1) | ||
|
||
if not name: | ||
return pr_interface_list(json) | ||
|
||
iface = find_iface(json["ietf-interfaces:interfaces"]["interface"], name) | ||
if not iface: | ||
print(f"Interface {name} not found") | ||
sys.exit(1) | ||
return iface.pr_iface() | ||
|
||
|
||
json = json.load(sys.stdin) | ||
|
||
if args.module == "ietf-interfaces": | ||
sys.exit(ietf_interfaces(json, args.name)) | ||
else: | ||
print(f"Error, unknown module {args.module}") | ||
sys.exit(1) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters