Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconfigure MQTT binary_sensor component if discovery info is changed #18169

Merged
110 changes: 71 additions & 39 deletions homeassistant/components/binary_sensor/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
https://home-assistant.io/components/binary_sensor.mqtt/
"""
import logging
from typing import Optional

import voluptuous as vol

Expand All @@ -19,7 +18,8 @@
from homeassistant.components.mqtt import (
ATTR_DISCOVERY_HASH, CONF_STATE_TOPIC, CONF_AVAILABILITY_TOPIC,
CONF_PAYLOAD_AVAILABLE, CONF_PAYLOAD_NOT_AVAILABLE, CONF_QOS,
MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo)
MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo,
subscription)
from homeassistant.components.mqtt.discovery import MQTT_DISCOVERY_NEW
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
Expand Down Expand Up @@ -79,57 +79,83 @@ async def _async_setup_entity(hass, config, async_add_entities,
value_template.hass = hass

async_add_entities([MqttBinarySensor(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_AVAILABILITY_TOPIC),
config.get(CONF_DEVICE_CLASS),
config.get(CONF_QOS),
config.get(CONF_FORCE_UPDATE),
config.get(CONF_OFF_DELAY),
config.get(CONF_PAYLOAD_ON),
config.get(CONF_PAYLOAD_OFF),
config.get(CONF_PAYLOAD_AVAILABLE),
config.get(CONF_PAYLOAD_NOT_AVAILABLE),
value_template,
config.get(CONF_UNIQUE_ID),
config.get(CONF_DEVICE),
discovery_hash,
config,
discovery_hash
)])


class MqttBinarySensor(MqttAvailability, MqttDiscoveryUpdate,
MqttEntityDeviceInfo, BinarySensorDevice):
"""Representation a binary sensor that is updated by MQTT."""

def __init__(self, name, state_topic, availability_topic, device_class,
qos, force_update, off_delay, payload_on, payload_off,
payload_available, payload_not_available, value_template,
unique_id: Optional[str], device_config: Optional[ConfigType],
discovery_hash):
def __init__(self, config, discovery_hash):
"""Initialize the MQTT binary sensor."""
MqttAvailability.__init__(self, availability_topic, qos,
payload_available, payload_not_available)
MqttDiscoveryUpdate.__init__(self, discovery_hash)
MqttEntityDeviceInfo.__init__(self, device_config)
self._name = name
self._config = config
self._state = None
self._state_topic = state_topic
self._device_class = device_class
self._payload_on = payload_on
self._payload_off = payload_off
self._qos = qos
self._force_update = force_update
self._off_delay = off_delay
self._template = value_template
self._unique_id = unique_id
self._discovery_hash = discovery_hash
self._sub_state = None
self._delay_listener = None

self._name = None
self._state_topic = None
self._device_class = None
self._payload_on = None
self._payload_off = None
self._qos = None
self._force_update = None
self._off_delay = None
self._template = None
self._unique_id = None

# Load config
self._setup_from_config(config)

availability_topic = config.get(CONF_AVAILABILITY_TOPIC)
payload_available = config.get(CONF_PAYLOAD_AVAILABLE)
payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE)
device_config = config.get(CONF_DEVICE)

MqttAvailability.__init__(self, availability_topic, self._qos,
payload_available, payload_not_available)
MqttDiscoveryUpdate.__init__(self, discovery_hash,
self.discovery_update)
MqttEntityDeviceInfo.__init__(self, device_config)

async def async_added_to_hass(self):
"""Subscribe mqtt events."""
await MqttAvailability.async_added_to_hass(self)
await MqttDiscoveryUpdate.async_added_to_hass(self)
await self._subscribe_topics()

async def discovery_update(self, discovery_payload):
"""Handle updated discovery message."""
config = PLATFORM_SCHEMA(discovery_payload)
self._setup_from_config(config)
await self.availability_discovery_update(config)
await self._subscribe_topics()
self.async_schedule_update_ha_state()

def _setup_from_config(self, config):
"""(Re)Setup the entity."""
self._name = config.get(CONF_NAME)
self._state_topic = config.get(CONF_STATE_TOPIC)
self._device_class = config.get(CONF_DEVICE_CLASS)
self._qos = config.get(CONF_QOS)
self._force_update = config.get(CONF_FORCE_UPDATE)
self._off_delay = config.get(CONF_OFF_DELAY)
self._payload_on = config.get(CONF_PAYLOAD_ON)
self._payload_off = config.get(CONF_PAYLOAD_OFF)
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = self.hass
self._template = value_template

self._unique_id = config.get(CONF_UNIQUE_ID)

# TODO: Handle changed device?
# config.get(CONF_DEVICE),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OttoWinter What do you suggest to do if device info is changed, just update the MqttEntityDeviceInfo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not the same as availability_discovery_update, just called device_info_discovery_update - this method would set the internal self._device_info object to the latest values.

(It might be the case though that the device_info is only read when the device is initially added to hass, then I guess we ignore the update to this attribute, as it's probably not used too often)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be the case though that the device_info is only read when the device is initially added to hass

Yes, this seems to be the cased based on some quick testing.
Either just ignore this case as you suggest, or delete + re-add the component.


async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
@callback
def off_delay_listener(now):
"""Switch device off after a delay."""
Expand Down Expand Up @@ -162,8 +188,14 @@ def state_message_received(_topic, payload, _qos):

self.async_schedule_update_ha_state()

await mqtt.async_subscribe(
self.hass, self._state_topic, state_message_received, self._qos)
self._sub_state = await subscription.async_subscribe_topics(
self.hass, self._sub_state, {'state_topic': self._state_topic},
state_message_received, self._qos)

async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
await subscription.async_unsubscribe_topics(self.hass, self._sub_state)
await MqttAvailability.async_will_remove_from_hass(self)

@property
def should_poll(self):
Expand Down
38 changes: 33 additions & 5 deletions homeassistant/components/mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,12 +832,30 @@ def __init__(self, availability_topic: Optional[str], qos: Optional[int],
self._available = availability_topic is None # type: bool
self._payload_available = payload_available
self._payload_not_available = payload_not_available
self._availability_sub_state = None

async def async_added_to_hass(self) -> None:
"""Subscribe MQTT events.

This method must be run in the event loop and returns a coroutine.
"""
await self._availability_subscribe_topics()

async def availability_discovery_update(self, config: dict):
"""Handle updated discovery message."""
self._availability_setup_from_config(config)
await self._availability_subscribe_topics()

def _availability_setup_from_config(self, config):
"""(Re)Setup."""
self._availability_topic = config.get(CONF_AVAILABILITY_TOPIC)
self._payload_available = config.get(CONF_PAYLOAD_AVAILABLE)
self._payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE)

async def _availability_subscribe_topics(self):
"""(Re)Subscribe to topics."""
from .subscription import async_subscribe_topics

@callback
def availability_message_received(topic: str,
payload: SubscribePayloadType,
Expand All @@ -850,10 +868,15 @@ def availability_message_received(topic: str,

self.async_schedule_update_ha_state()

if self._availability_topic is not None:
await async_subscribe(
self.hass, self._availability_topic,
availability_message_received, self._availability_qos)
self._availability_sub_state = await async_subscribe_topics(
self.hass, self._availability_sub_state,
{'availability_topic': self._availability_topic},
availability_message_received, self._availability_qos)

async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
from .subscription import async_unsubscribe_topics
await async_unsubscribe_topics(self.hass, self._availability_sub_state)

@property
def available(self) -> bool:
Expand All @@ -864,9 +887,10 @@ def available(self) -> bool:
class MqttDiscoveryUpdate(Entity):
"""Mixin used to handle updated discovery message."""

def __init__(self, discovery_hash) -> None:
def __init__(self, discovery_hash, discovery_update=None) -> None:
"""Initialize the discovery update mixin."""
self._discovery_hash = discovery_hash
self._discovery_update = discovery_update
self._remove_signal = None

async def async_added_to_hass(self) -> None:
Expand All @@ -886,6 +910,10 @@ def discovery_callback(payload):
self.hass.async_create_task(self.async_remove())
del self.hass.data[ALREADY_DISCOVERED][self._discovery_hash]
self._remove_signal()
emontnemery marked this conversation as resolved.
Show resolved Hide resolved
elif self._discovery_update:
# Non-empty payload: Notify component
_LOGGER.info("Updating component: %s", self.entity_id)
self.hass.async_create_task(self._discovery_update(payload))

if self._discovery_hash:
self._remove_signal = async_dispatcher_connect(
Expand Down
35 changes: 19 additions & 16 deletions homeassistant/components/mqtt/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,38 +206,41 @@ async def async_device_message_received(topic, payload, qos):
if value[-1] == TOPIC_BASE and key.endswith('_topic'):
payload[key] = "{}{}".format(value[:-1], base)

# If present, the node_id will be included in the discovered object id
discovery_id = '_'.join((node_id, object_id)) if node_id else object_id

if ALREADY_DISCOVERED not in hass.data:
hass.data[ALREADY_DISCOVERED] = {}

# If present, unique_id is used as the discovered object id. Otherwise,
# if present, the node_id will be included in the discovered object id
discovery_id = payload.get(
'unique_id', '_'.join(
(node_id, object_id)) if node_id else object_id)
emontnemery marked this conversation as resolved.
Show resolved Hide resolved
discovery_hash = (component, discovery_id)

if discovery_hash in hass.data[ALREADY_DISCOVERED]:
_LOGGER.info(
"Component has already been discovered: %s %s, sending update",
component, discovery_id)
async_dispatcher_send(
hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), payload)
elif payload:
# Add component
if payload:
platform = payload.get(CONF_PLATFORM, 'mqtt')
if platform not in ALLOWED_PLATFORMS.get(component, []):
_LOGGER.warning("Platform %s (component %s) is not allowed",
platform, component)
return

payload[CONF_PLATFORM] = platform

if CONF_STATE_TOPIC not in payload:
payload[CONF_STATE_TOPIC] = '{}/{}/{}{}/state'.format(
discovery_topic, component,
'%s/' % node_id if node_id else '', object_id)

hass.data[ALREADY_DISCOVERED][discovery_hash] = None
payload[ATTR_DISCOVERY_HASH] = discovery_hash

if ALREADY_DISCOVERED not in hass.data:
hass.data[ALREADY_DISCOVERED] = {}
if discovery_hash in hass.data[ALREADY_DISCOVERED]:
# Dispatch update
_LOGGER.info(
"Component has already been discovered: %s %s, sending update",
component, discovery_id)
async_dispatcher_send(
hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), payload)
elif payload:
# Add component
_LOGGER.info("Found new component: %s %s", component, discovery_id)
hass.data[ALREADY_DISCOVERED][discovery_hash] = None

if platform not in CONFIG_ENTRY_PLATFORMS.get(component, []):
await async_load_platform(
Expand Down
51 changes: 51 additions & 0 deletions homeassistant/components/mqtt/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Helper to handle a set of topics to subscribe to.

For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/
"""
import logging

from homeassistant.components import mqtt
from homeassistant.components.mqtt import DEFAULT_QOS, MessageCallbackType
from homeassistant.loader import bind_hass
from homeassistant.helpers.typing import (
HomeAssistantType)

_LOGGER = logging.getLogger(__name__)


@bind_hass
async def async_subscribe_topics(hass: HomeAssistantType, sub_state: dict,
balloob marked this conversation as resolved.
Show resolved Hide resolved
new_topics: dict,
msg_callback: MessageCallbackType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have a generic msg callback for all topics. For example, this won't work for the MQTT light, as it has a different callback for brightness and state.

I would expect new_topics to be called topics (as there might nothing be new about it) and be defined like this:

{
  '/home/light/kitchen/state': {
    'qos': 2,
    'encoding': 'utf-8',
    'msg_callback': state_received
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

qos: int = DEFAULT_QOS,
encoding: str = 'utf-8'):
"""(Re)Subscribe to a set of MQTT topics.

State is kept in sub_state.
"""
if sub_state is None:
sub_state = {'topics': {}}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to wrap it in a dictionary. It's an opaque object to the outside world, inside this method we always know the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, fixed.

old_topics = sub_state['topics']
for key, sub in list(old_topics.items()):
emontnemery marked this conversation as resolved.
Show resolved Hide resolved
topic = sub[0]
unsub = sub[1]
if key not in new_topics or topic != new_topics[key]:
if unsub is not None:
unsub()
del old_topics[key]
for key, topic in new_topics.items():
if key not in old_topics and topic is not None:
unsub = await mqtt.async_subscribe(hass, topic, msg_callback, qos)
old_topics[key] = (topic, unsub)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's kinda confusing that we have an object called old_topics that we're writing to. That's also why we need to wrap the for-loop with a list around values.

We should rename old_topic to cur_topics and create a new state dictionary. Then just while iterating over the new_topics param, we pop keys from cur_topics. When done with the for-loop, anything that's left in cur_topics can be unsubscribed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, rewritten as you suggest.


return sub_state


@bind_hass
async def async_unsubscribe_topics(hass: HomeAssistantType, sub_state: dict):
"""Unsubscribe from all MQTT topics managed by async_subscribe_topics."""
await async_subscribe_topics(hass, sub_state, {}, None)

return sub_state
1 change: 1 addition & 0 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def async_mock_mqtt_component(hass, config=None):
with patch('paho.mqtt.client.Client') as mock_client:
mock_client().connect.return_value = 0
mock_client().subscribe.return_value = (0, 0)
mock_client().unsubscribe.return_value = (0, 0)
mock_client().publish.return_value = (0, 0)

result = yield from async_setup_component(hass, mqtt.DOMAIN, {
Expand Down
Loading