Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume dim for non i2cs devices #423

Merged
merged 5 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions insteon_mqtt/device/base/Base.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ def addRefreshData(self, seq, force=False):
if self.db.desc is None or self.db.firmware is None or force:
seq.add(self.get_model)

# If the engine is not known, or force true, run get_engine
if self.db.engine is None or force:
seq.add(self.get_engine)

#-----------------------------------------------------------------------
def get_flags(self, on_done=None):
"""Get the Insteon operational flags field from the device.
Expand Down Expand Up @@ -649,6 +653,23 @@ def get_model(self, on_done=None):
on_done)
self.send(msg, msg_handler)

def _set_operating_flag_msg(self, cmd2):
"""Creates a set operating flags message

This will create the appropriate message based on the device's
engine (extended for i2cs, else standard)

Args:
cmd2 (int): the set operating flags option (i.e. 0x04 to enable
resume dim)
"""
cmd1 = Msg.CmdType.SET_OPERATING_FLAGS
if self.db.engine is not None and self.db.engine < 2:
return Msg.OutStandard.direct(self.addr, cmd1, cmd2)
else:
return Msg.OutExtended.direct(self.addr, cmd1, cmd2,
bytes([0x00] * 14), crc_type='D14')

#-----------------------------------------------------------------------
def sync(self, dry_run=True, refresh=True, sequence=None, on_done=None):
"""Syncs the links on the device.
Expand Down
8 changes: 2 additions & 6 deletions insteon_mqtt/device/base/DimmerBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,10 @@ def set_resume_dim(self, on_done=None, **kwargs):
# 0x05 - Disables resume dim
if resume_dim:
LOG.info("Device %s enabling resume dim", self.label)
cmd2 = 0x04
msg = self._set_operating_flag_msg(0x04)
else:
LOG.info("Device %s disabling resume dim", self.label)
cmd2 = 0x05

cmd1 = Msg.CmdType.SET_OPERATING_FLAGS
msg = Msg.OutExtended.direct(self.addr, cmd1, cmd2,
bytes([0x00] * 14))
msg = self._set_operating_flag_msg(0x05)

# Use the standard command handler which will notify us when the
# command is ACK'ed.
Expand Down
25 changes: 24 additions & 1 deletion tests/device/test_DimmerDev.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,34 @@ def test_handle_on_off_manual(self, test_device, group, cmd1, cmd2, expected):
else:
mocked.assert_not_called()

def test_set_resume_dim(self, test_device):
def test_set_resume_dim_non_i2cs(self, test_device):
# set_resume_dim(self, resume_dim, on_done=None)
for params in ([True, 0x04], [False, 0x05]):
test_device.set_resume_dim(resume_dim=params[0])
assert len(test_device.protocol.sent) == 1
assert isinstance(test_device.protocol.sent[0].msg, Msg.OutStandard)
assert test_device.protocol.sent[0].msg.cmd1 == 0x20
assert test_device.protocol.sent[0].msg.cmd2 == params[1]
test_device.protocol.clear()

lnr0626 marked this conversation as resolved.
Show resolved Hide resolved
def test_set_resume_dim_unknown(self, test_device):
# set_resume_dim(self, resume_dim, on_done=None)
test_device.db.engine = None
for params in ([True, 0x04], [False, 0x05]):
test_device.set_resume_dim(resume_dim=params[0])
assert len(test_device.protocol.sent) == 1
assert isinstance(test_device.protocol.sent[0].msg, Msg.OutStandard)
assert test_device.protocol.sent[0].msg.cmd1 == 0x20
assert test_device.protocol.sent[0].msg.cmd2 == params[1]
test_device.protocol.clear()

def test_set_resume_dim_i2cs(self, test_device):
# set_resume_dim(self, resume_dim, on_done=None)
test_device.db.engine = 2
for params in ([True, 0x04], [False, 0x05]):
test_device.set_resume_dim(resume_dim=params[0])
assert len(test_device.protocol.sent) == 1
assert isinstance(test_device.protocol.sent[0].msg, Msg.OutExtended)
assert test_device.protocol.sent[0].msg.cmd1 == 0x20
assert test_device.protocol.sent[0].msg.cmd2 == params[1]
assert test_device.protocol.sent[0].msg.data == bytes([0x00] * 14)
Expand Down