-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opensky sensor #7061
opensky sensor #7061
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
""" | ||
Sensor for the Open Sky Network. | ||
|
||
For more details about this platform, please refer to the documentation at | ||
https://home-assistant.io/components/sensor.opensky/ | ||
""" | ||
import logging | ||
from datetime import timedelta | ||
|
||
import requests | ||
import voluptuous as vol | ||
|
||
from homeassistant.components.sensor import PLATFORM_SCHEMA | ||
from homeassistant.const import ( | ||
CONF_NAME, CONF_LATITUDE, CONF_LONGITUDE, | ||
ATTR_ATTRIBUTION, ATTR_LATITUDE, ATTR_LONGITUDE) | ||
from homeassistant.helpers.entity import Entity | ||
from homeassistant.util.location import vincenty | ||
import homeassistant.helpers.config_validation as cv | ||
|
||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
SCAN_INTERVAL = timedelta(seconds=12) # opensky public limit is 10 seconds | ||
DOMAIN = 'opensky' | ||
EVENT_OPENSKY_ENTRY = '{}_entry'.format(DOMAIN) | ||
EVENT_OPENSKY_EXIT = '{}_exit'.format(DOMAIN) | ||
CONF_RADIUS = 'radius' | ||
ATTR_SENSOR = 'sensor' | ||
ATTR_STATES = 'states' | ||
ATTR_ON_GROUND = 'on_ground' | ||
ATTR_CALLSIGN = 'callsign' | ||
UNIT_OF_MEASUREMENT = 'flights' | ||
ICON = 'mdi:airplane' | ||
OPENSKY_ATTRIBUTION = "Information provided by the OpenSky Network "\ | ||
"(https://opensky-network.org)" | ||
OPENSKY_API_URL = 'https://opensky-network.org/api/states/all' | ||
OPENSKY_API_FIELDS = [ | ||
'icao24', ATTR_CALLSIGN, 'origin_country', 'time_position', | ||
'time_velocity', ATTR_LONGITUDE, ATTR_LATITUDE, 'altitude', | ||
ATTR_ON_GROUND, 'velocity', 'heading', 'vertical_rate', 'sensors'] | ||
|
||
|
||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | ||
vol.Required(CONF_RADIUS): vol.Coerce(float), | ||
vol.Optional(CONF_NAME): cv.string, | ||
vol.Inclusive(CONF_LATITUDE, 'coordinates'): cv.latitude, | ||
vol.Inclusive(CONF_LONGITUDE, 'coordinates'): cv.longitude | ||
}) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def setup_platform(hass, config, add_devices, discovery_info=None): | ||
"""Setup the Open Sky platform.""" | ||
session = requests.Session() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move that into |
||
latitude = config.get(CONF_LATITUDE, hass.config.latitude) | ||
longitude = config.get(CONF_LONGITUDE, hass.config.longitude) | ||
add_devices([OpenSkySensor( | ||
hass, config.get(CONF_NAME, DOMAIN), session, latitude, longitude, | ||
config.get(CONF_RADIUS))], True) | ||
|
||
|
||
class OpenSkySensor(Entity): | ||
"""Open Sky Network Sensor.""" | ||
|
||
def __init__(self, hass, name, session, latitude, longitude, radius): | ||
"""Initialize the sensor.""" | ||
self._session = session | ||
self._latitude = latitude | ||
self._longitude = longitude | ||
self._radius = radius | ||
self._state = 0 | ||
self._hass = hass | ||
self._name = name | ||
self._previously_tracked = None | ||
|
||
@property | ||
def name(self): | ||
"""Return the name of the sensor.""" | ||
return self._name | ||
|
||
@property | ||
def state(self): | ||
"""Return the state of the sensor.""" | ||
return self._state | ||
|
||
def _handle_boundary(self, callsigns, event): | ||
"""Handle flights crossing region boundary.""" | ||
for callsign in callsigns: | ||
data = { | ||
ATTR_CALLSIGN: callsign, | ||
ATTR_SENSOR: self._name | ||
} | ||
self._hass.bus.fire(event, data) | ||
|
||
def update(self): | ||
"""Update device state.""" | ||
currently_tracked = set() | ||
states = self._session.get(OPENSKY_API_URL).json().get(ATTR_STATES) | ||
for state in states: | ||
data = dict(zip(OPENSKY_API_FIELDS, state)) | ||
missing_location = ( | ||
data.get(ATTR_LONGITUDE) is None or | ||
data.get(ATTR_LATITUDE) is None) | ||
if missing_location: | ||
continue | ||
if data.get(ATTR_ON_GROUND): | ||
continue | ||
distance = vincenty( | ||
(self._latitude, self._longitude), | ||
(data.get(ATTR_LATITUDE), data.get(ATTR_LONGITUDE))) | ||
if distance > self._radius: | ||
continue | ||
callsign = data[ATTR_CALLSIGN].strip() | ||
if callsign == '': | ||
continue | ||
currently_tracked.add(callsign) | ||
if self._previously_tracked is not None: | ||
entries = currently_tracked - self._previously_tracked | ||
exits = self._previously_tracked - currently_tracked | ||
self._handle_boundary(entries, EVENT_OPENSKY_ENTRY) | ||
self._handle_boundary(exits, EVENT_OPENSKY_EXIT) | ||
self._state = len(currently_tracked) | ||
self._previously_tracked = currently_tracked | ||
|
||
@property | ||
def device_state_attributes(self): | ||
"""Return the state attributes.""" | ||
return { | ||
ATTR_ATTRIBUTION: OPENSKY_ATTRIBUTION | ||
} | ||
|
||
@property | ||
def unit_of_measurement(self): | ||
"""Unit of measurement.""" | ||
return UNIT_OF_MEASUREMENT | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use a const if you need it only on one place. |
||
|
||
@property | ||
def icon(self): | ||
"""Icon.""" | ||
return ICON | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use a const if you need it only on one place. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should be friendly and not being too close to the limit of the API. Isn't it enough to get the data every 30 or so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, if you want to know when a flight enters your region, it'll be gone already in 30 seconds by the time the sensor updates.
But it is a
SCAN_INTERVAL
, so people with that use case could always set it lower if we default to 30.