Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed NDMS for latest firmware #15511

Merged
merged 5 commits into from
Jul 31, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 113 additions & 30 deletions homeassistant/components/device_tracker/keenetic_ndms2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
https://home-assistant.io/components/device_tracker.keenetic_ndms2/
"""
import logging
import telnetlib
import re
from collections import namedtuple

import requests
import voluptuous as vol

import homeassistant.helpers.config_validation as cv
from homeassistant.components.device_tracker import (
DOMAIN, PLATFORM_SCHEMA, DeviceScanner)
from homeassistant.const import (
CONF_HOST, CONF_PASSWORD, CONF_USERNAME
CONF_HOST, CONF_PORT, CONF_PASSWORD, CONF_USERNAME
)

_LOGGER = logging.getLogger(__name__)
Expand All @@ -25,24 +26,50 @@
CONF_INTERFACE = 'interface'

DEFAULT_INTERFACE = 'Home'
DEFAULT_PORT = 23


PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_INTERFACE, default=DEFAULT_INTERFACE): cv.string,
})


_ARP_CMD = 'show ip arp'
_ARP_REGEX = re.compile(
r'(?P<name>([^\ ]+))\s+' +
r'(?P<ip>([0-9]{1,3}[\.]){3}[0-9]{1,3})\s+' +
r'(?P<mac>(([0-9a-f]{2}[:-]){5}([0-9a-f]{2})))\s+' +
r'(?P<interface>([^\ ]+))\s+'
)


def get_scanner(_hass, config):
"""Validate the configuration and return a Nmap scanner."""
scanner = KeeneticNDMS2DeviceScanner(config[DOMAIN])

return scanner if scanner.success_init else None


Device = namedtuple('Device', ['mac', 'name'])
def _parse_lines(lines, regex):
"""Parse the lines using the given regular expression.

If a line can't be parsed it is logged and skipped in the output.
"""
results = []
for line in lines:
match = regex.search(line)
if not match:
_LOGGER.debug("Could not parse line: %s", line)
continue
results.append(match.groupdict())
return results


Device = namedtuple('Device', ['mac', 'name', 'ip'])


class KeeneticNDMS2DeviceScanner(DeviceScanner):
Expand All @@ -52,12 +79,20 @@ def __init__(self, config):
"""Initialize the scanner."""
self.last_results = []

self._url = 'http://%s/rci/show/ip/arp' % config[CONF_HOST]
self._host = config[CONF_HOST]
self._port = config[CONF_PORT]
self._interface = config[CONF_INTERFACE]

self._username = config.get(CONF_USERNAME)
self._password = config.get(CONF_PASSWORD)

self.connection = TelnetConnection(
self._host,
self._port,
self._username,
self._password,
)

self.success_init = self._update_info()
_LOGGER.info("Scanner initialized")

Expand All @@ -67,55 +102,103 @@ def scan_devices(self):

return [device.mac for device in self.last_results]

def get_device_name(self, device):
def get_device_name(self, mac):
"""Return the name of the given device or None if we don't know."""
filter_named = [result.name for result in self.last_results
if result.mac == device]
filter_name = [device.name for device in self.last_results
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks weird. Use a different name for the iteration variable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a generator expression and next for speed and clean code:

name = next((
    result.name for result in self.last_results
    if result.mac == device), None)
return name

if device.mac == mac]

if filter_named:
return filter_named[0]
if filter_name:
return filter_name[0]
return None

def get_extra_attributes(self, device):
"""Return the IP of the given device."""
filter_ip = [result.ip for result in self.last_results
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a generator expression and next here too.

Copy link
Contributor Author

@foxel foxel Jul 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is get_extra_attributes expected to return None? e.g. in case of the device not found.

if result.mac == device]
if filter_ip:
return {'ip': filter_ip[0]}
return {'ip': None}

def _update_info(self):
"""Get ARP from keenetic router."""
_LOGGER.info("Fetching...")

last_results = []

# doing a request
try:
from requests.auth import HTTPDigestAuth
res = requests.get(self._url, timeout=10, auth=HTTPDigestAuth(
self._username, self._password
))
except requests.exceptions.Timeout:
_LOGGER.error(
"Connection to the router timed out at URL %s", self._url)
return False
if res.status_code != 200:
_LOGGER.error(
"Connection failed with http code %s", res.status_code)
return False
try:
result = res.json()
except ValueError:
# If json decoder could not parse the response
_LOGGER.error("Failed to parse response from router")
lines = self.connection.run_command(_ARP_CMD)
if not lines:
return False

# parsing response
result = _parse_lines(lines, _ARP_REGEX)

for info in result:
if info.get('interface') != self._interface:
continue
mac = info.get('mac')
name = info.get('name')
ip = info.get('ip')
# No address = no item :)
if mac is None:
continue

last_results.append(Device(mac.upper(), name))
last_results.append(Device(mac.upper(), name, ip))

self.last_results = last_results

_LOGGER.info("Request successful")
return True


class TelnetConnection(object):
"""Maintains a Telnet connection to a router."""

def __init__(self, host, port, username, password):
"""Initialize the Telnet connection properties."""

self._connected = False
self._telnet = None
self._host = host
self._port = port
self._username = username
self._password = password
self._prompt_string = None

def run_command(self, command):
"""Run a command through a Telnet connection.

Connect to the Telnet server if not currently connected, otherwise
use the existing connection.
"""
try:
if not self._telnet:
self.connect()

self._telnet.write('{}\n'.format(command).encode('ascii'))
return self._telnet.read_until(self._prompt_string, 30)\
.decode('ascii')\
.split('\n')[1:-1]
except Exception as e:
_LOGGER.error("Telnet error: $s", e)
self.disconnect()
return None

def connect(self):
"""Connect to the ASUS-WRT Telnet server."""
self._telnet = telnetlib.Telnet(self._host)
self._telnet.read_until(b'Login: ', 30)
self._telnet.write((self._username + '\n').encode('ascii'))
self._telnet.read_until(b'Password: ', 30)
self._telnet.write((self._password + '\n').encode('ascii'))
self._prompt_string = self._telnet.read_until(b'>').split(b'\n')[-1]

self._connected = True

def disconnect(self):
"""Disconnect the current Telnet connection."""
try:
self._telnet.write(b'exit\n')
except Exception as e:
_LOGGER.error("Telnet error on exit: $s", e)
pass

self._telnet = None