Skip to content

Commit

Permalink
Using external library for NDMS interactions
Browse files Browse the repository at this point in the history
  • Loading branch information
foxel committed Jul 31, 2018
1 parent 4c8cf58 commit 1034c79
Showing 1 changed file with 21 additions and 120 deletions.
141 changes: 21 additions & 120 deletions homeassistant/components/device_tracker/keenetic_ndms2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
https://home-assistant.io/components/device_tracker.keenetic_ndms2/
"""
import logging
import telnetlib
import re
from collections import namedtuple

import voluptuous as vol

Expand All @@ -18,6 +15,8 @@
CONF_HOST, CONF_PORT, CONF_PASSWORD, CONF_USERNAME
)

REQUIREMENTS = ['ndms2_client==0.0.2']

_LOGGER = logging.getLogger(__name__)

# Interface name to track devices for. Most likely one will not need to
Expand All @@ -38,60 +37,29 @@
})


_ARP_CMD = 'show ip arp'
_ARP_REGEX = re.compile(
r'(?P<name>([^\ ]+))\s+' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s+' +
r'(?P<interface>([^\ ]+))\s+'
)


def get_scanner(_hass, config):
"""Validate the configuration and return a Nmap scanner."""
scanner = KeeneticNDMS2DeviceScanner(config[DOMAIN])

return scanner if scanner.success_init else None


def _parse_lines(lines, regex):
"""Parse the lines using the given regular expression.
If a line can't be parsed it is logged and skipped in the output.
"""
results = []
for line in lines:
match = regex.search(line)
if not match:
_LOGGER.debug("Could not parse line: %s", line)
continue
results.append(match.groupdict())
return results


Device = namedtuple('Device', ['mac', 'name', 'ip'])


class KeeneticNDMS2DeviceScanner(DeviceScanner):
"""This class scans for devices using keenetic NDMS2 web interface."""

def __init__(self, config):
"""Initialize the scanner."""
from ndms2_client import Client, TelnetConnection
self.last_results = []

self._host = config[CONF_HOST]
self._port = config[CONF_PORT]
self._interface = config[CONF_INTERFACE]

self._username = config.get(CONF_USERNAME)
self._password = config.get(CONF_PASSWORD)

self.connection = TelnetConnection(
self._host,
self._port,
self._username,
self._password,
)
self._client = Client(TelnetConnection(
config.get(CONF_HOST),
config.get(CONF_PORT),
config.get(CONF_USERNAME),
config.get(CONF_PASSWORD),
))

self.success_init = self._update_info()
_LOGGER.info("Scanner initialized")
Expand Down Expand Up @@ -121,84 +89,17 @@ def get_extra_attributes(self, device):

def _update_info(self):
"""Get ARP from keenetic router."""
_LOGGER.info("Fetching...")

last_results = []

lines = self.connection.run_command(_ARP_CMD)
if not lines:
return False
_LOGGER.info("Fetching devices from router...")

result = _parse_lines(lines, _ARP_REGEX)

for info in result:
if info.get('interface') != self._interface:
continue
mac = info.get('mac')
name = info.get('name')
ip = info.get('ip')
# No address = no item :)
if mac is None:
continue

last_results.append(Device(mac.upper(), name, ip))

self.last_results = last_results

_LOGGER.info("Request successful")
return True


class TelnetConnection(object):
"""Maintains a Telnet connection to a router."""

def __init__(self, host, port, username, password):
"""Initialize the Telnet connection properties."""

self._connected = False
self._telnet = None
self._host = host
self._port = port
self._username = username
self._password = password
self._prompt_string = None

def run_command(self, command):
"""Run a command through a Telnet connection.
Connect to the Telnet server if not currently connected, otherwise
use the existing connection.
"""
from ndms2_client import ConnectionException
try:
if not self._telnet:
self.connect()

self._telnet.write('{}\n'.format(command).encode('ascii'))
return self._telnet.read_until(self._prompt_string, 30)\
.decode('ascii')\
.split('\n')[1:-1]
except Exception as e:
_LOGGER.error("Telnet error: $s", e)
self.disconnect()
return None

def connect(self):
"""Connect to the ASUS-WRT Telnet server."""
self._telnet = telnetlib.Telnet(self._host)
self._telnet.read_until(b'Login: ', 30)
self._telnet.write((self._username + '\n').encode('ascii'))
self._telnet.read_until(b'Password: ', 30)
self._telnet.write((self._password + '\n').encode('ascii'))
self._prompt_string = self._telnet.read_until(b'>').split(b'\n')[-1]

self._connected = True

def disconnect(self):
"""Disconnect the current Telnet connection."""
try:
self._telnet.write(b'exit\n')
except Exception as e:
_LOGGER.error("Telnet error on exit: $s", e)
pass

self._telnet = None
self.last_results = [
dev
for dev in self._client.get_devices()
if dev.interface == self._interface
]
return True

except ConnectionException:
_LOGGER.error("Error fetching data from router")
return False

0 comments on commit 1034c79

Please sign in to comment.