forked from gkreitz/homeassistant-grohe_sense
-
Notifications
You must be signed in to change notification settings - Fork 3
/
switch.py
133 lines (106 loc) · 5.21 KB
/
switch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
from datetime import (timedelta)
from homeassistant.components.switch import SwitchEntity
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
from homeassistant.const import (STATE_UNKNOWN)
from . import (DOMAIN, BASE_URL, GROHE_SENSE_TYPE, GROHE_SENSE_GUARD_TYPE, GROHE_BLUE_HOME_TYPE)
_LOGGER = logging.getLogger(__name__)
VALVE_UPDATE_DELAY = timedelta(minutes=1)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
_LOGGER.debug("Starting Grohe Sense valve switch")
auth_session = hass.data[DOMAIN]['session']
entities = []
for device in filter(lambda d: d.type == GROHE_SENSE_GUARD_TYPE, hass.data[DOMAIN]['devices']):
entities.append(GroheSenseGuardValve(auth_session, device.locationId, device.roomId, device.applianceId, device.name))
if entities:
async_add_entities(entities)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
_LOGGER.debug("Starting Grohe Blue Home tap")
auth_session = hass.data[DOMAIN]['session']
entities = []
for device in filter(lambda d: d.type == GROHE_BLUE_HOME_TYPE, hass.data[DOMAIN]['devices']):
entities.append(GroheBlueHomeTap(auth_session, device.locationId, device.roomId, device.applianceId, device.name))
if entities:
async_add_entities(entities)
class GroheSenseGuardValve(SwitchEntity):
def __init__(self, auth_session, locationId, roomId, applianceId, name):
self._auth_session = auth_session
self._locationId = locationId
self._roomId = roomId
self._applianceId = applianceId
self._name = name
self._is_on = STATE_UNKNOWN
@property
def name(self):
return '{} valve'.format(self._name)
@property
def is_on(self):
return self._is_on
@property
def icon(self):
return 'mdi:water'
@property
def device_class(self):
return 'switch'
@Throttle(VALVE_UPDATE_DELAY)
async def async_update(self):
command_response = await self._auth_session.get(BASE_URL + f'locations/{self._locationId}/rooms/{self._roomId}/appliances/{self._applianceId}/command')
if 'command' in command_response and 'valve_open' in command_response['command']:
self._is_on = command_response['command']['valve_open']
else:
_LOGGER.error('Failed to parse out valve_open from commands response: %s', command_response)
async def _set_state(self, state):
data = { 'type': GROHE_SENSE_GUARD_TYPE, 'command': { 'valve_open': state } }
command_response = await self._auth_session.post(BASE_URL + f'locations/{self._locationId}/rooms/{self._roomId}/appliances/{self._applianceId}/command', data)
if 'command' in command_response and 'valve_open' in command_response['command']:
self._is_on = command_response['command']['valve_open']
else:
_LOGGER.warning('Got unknown response back when setting valve state: %s', command_response)
async def async_turn_on(self, **kwargs):
_LOGGER.info('Turning on water for %s', self._name)
await self._set_state(True)
async def async_turn_off(self, **kwargs):
_LOGGER.info('Turning off water for %s', self._name)
await self._set_state(False)
class GroheBlueHomeTap(SwitchEntity):
def __init__(self, auth_session, locationId, roomId, applianceId, name):
self._auth_session = auth_session
self._locationId = locationId
self._roomId = roomId
self._applianceId = applianceId
self._name = name
self._is_on = STATE_UNKNOWN
@property
def name(self):
return '{} valve'.format(self._name)
@property
def is_on(self):
return self._is_on
@property
def icon(self):
return 'mdi:water'
@property
def device_class(self):
return 'switch'
@Throttle(VALVE_UPDATE_DELAY)
async def async_update(self):
command_response = await self._auth_session.get(BASE_URL + f'locations/{self._locationId}/rooms/{self._roomId}/appliances/{self._applianceId}/command')
if 'command' in command_response and 'valve_open' in command_response['command']:
self._is_on = command_response['command']['valve_open']
else:
self._is_on = False
_LOGGER.error('Failed to parse out valve_open from commands response: %s', command_response)
async def _set_state(self, state):
data = { 'command': { 'tap_type': 1, "tap_amount": 20 } }
command_response = await self._auth_session.post(BASE_URL + f'locations/{self._locationId}/rooms/{self._roomId}/appliances/{self._applianceId}/command', data)
if 'command' in command_response and 'valve_open' in command_response['command']:
self._is_on = command_response['command']['valve_open']
else:
_LOGGER.warning('Got unknown response back when setting valve state: %s', command_response)
async def async_turn_on(self, **kwargs):
_LOGGER.info('Turning on water for %s', self._name)
await self._set_state(True)
async def async_turn_off(self, **kwargs):
_LOGGER.info('Turning off water for %s', self._name)
await self._set_state(False)