-
Notifications
You must be signed in to change notification settings - Fork 191
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #167 from Felix-Pi/master
Added SocketDevice
- Loading branch information
Showing
3 changed files
with
118 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# TinyTuya Outlet Device | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Python module to interface with Tuya Socket Devices | ||
Author: Felix Pieschka | ||
For more information see https://github.com/Felix-Pi | ||
Local Control Classes | ||
SocketDevice(dev_id, address, local_key=None, dev_type='default', version=3.3) | ||
Functions | ||
SocketDevice: | ||
get_energy_consumption() | ||
get_current() | ||
get_power() | ||
get_get_voltage() | ||
get_state() | ||
Inherited | ||
json = status() # returns json payload | ||
set_version(version) # 3.1 [default] or 3.3 | ||
set_socketPersistent(False/True) # False [default] or True | ||
set_socketNODELAY(False/True) # False or True [default] | ||
set_socketRetryLimit(integer) # retry count limit [default 5] | ||
set_socketTimeout(timeout) # set connection timeout in seconds [default 5] | ||
set_dpsUsed(dps_to_request) # add data points (DPS) to request | ||
add_dps_to_request(index) # add data point (DPS) index set to None | ||
set_retry(retry=True) # retry if response payload is truncated | ||
set_status(on, switch=1, nowait) # Set status of switch to 'on' or 'off' (bool) | ||
set_value(index, value, nowait) # Set int value of any index. | ||
heartbeat(nowait) # Send heartbeat to device | ||
updatedps(index=[1], nowait) # Send updatedps command to device | ||
turn_on(switch=1, nowait) # Turn on device / switch # | ||
turn_off(switch=1, nowait) # Turn off | ||
set_timer(num_secs, nowait) # Set timer for num_secs | ||
set_debug(toggle, color) # Activate verbose debugging output | ||
set_sendWait(num_secs) # Time to wait after sending commands before pulling response | ||
detect_available_dps() # Return list of DPS available from device | ||
generate_payload(command, data) # Generate TuyaMessage payload for command with data | ||
send(payload) # Send payload to device (do not wait for response) | ||
receive() | ||
""" | ||
|
||
from ..core import Device | ||
|
||
|
||
class SocketDevice(Device): | ||
""" | ||
Represents a Tuya based Socket | ||
Args: | ||
dev_id (str): The device id. | ||
address (str): The network address. | ||
local_key (str, optional): The encryption key. Defaults to None. | ||
version (float, optional): Tuya Device Version (look at Device.set_version) | ||
""" | ||
|
||
DPS_STATE = '1' | ||
DPS_CURRENT = '18' | ||
DPS_POWER = '19' | ||
DPS_VOLTAGE = '20' | ||
|
||
def __init__(self, dev_id, address, local_key="", dev_type="default", version=None): | ||
super(SocketDevice, self).__init__(dev_id, address, local_key, dev_type) | ||
|
||
if version is not None: | ||
self.set_version(version) | ||
|
||
def get_energy_consumption(self): | ||
data = self.status() | ||
return {**self.get_current(data), **self.get_power(data), **self.get_voltage(data)} | ||
|
||
def get_current(self, status_data=None): | ||
if status_data is None: | ||
status_data = self.status() | ||
|
||
current = status_data['dps'][self.DPS_CURRENT] | ||
|
||
return {'current_raw': current, | ||
'current_fmt': str(current) + ' mA', } | ||
|
||
def get_power(self, status_data=None): | ||
if status_data is None: | ||
status_data = self.status() | ||
|
||
power = status_data['dps'][self.DPS_POWER] / 10 | ||
|
||
return {'power_raw': power, | ||
'power_fmt': str(power) + ' W', } | ||
|
||
def get_voltage(self, status_data=None): | ||
if status_data is None: | ||
status_data = self.status() | ||
|
||
voltage = status_data['dps'][self.DPS_VOLTAGE] / 10 | ||
|
||
return {'voltage_raw': voltage, | ||
'voltage_fmt': str(voltage) + ' V'} | ||
|
||
def get_state(self): | ||
return {'on': self.status()['dps'][self.DPS_STATE]} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
|
||
from .ThermostatDevice import ThermostatDevice | ||
from .IRRemoteControlDevice import IRRemoteControlDevice | ||
from .SocketDevice import SocketDevice | ||
|
||
DeviceTypes = ["ThermostatDevice", "IRRemoteControlDevice"] | ||
DeviceTypes = ["ThermostatDevice", "IRRemoteControlDevice", "SocketDevice"] |