Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert tests to use asyncio (major speed up) #122

Merged
merged 2 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pywizlight/bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
BulbResponse = Dict[str, Any]


PORT = 38899

TIMEOUT = 7
SEND_INTERVAL = 0.5
MAX_SEND_DATAGRAMS = int(TIMEOUT / SEND_INTERVAL)
Expand Down Expand Up @@ -348,7 +350,7 @@ class wizlight:
def __init__(
self,
ip: str,
port: int = 38899,
port: int = PORT,
mac: Optional[str] = None,
) -> None:
"""Create instance with the IP address of the bulb."""
Expand Down
70 changes: 31 additions & 39 deletions pywizlight/tests/fake_bulb.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Start up a fake bulb to test features without a real bulb."""
import json
import socketserver
import threading
from typing import Any, Callable, Dict
from typing import cast, Any, Callable, Dict, Tuple
from pywizlight.protocol import WizProtocol
import asyncio


MODULE_CONFIGS = {
("ESP01_SHRGB_03", "1.25.0"): {
Expand Down Expand Up @@ -382,37 +383,38 @@ def get_initial_user_config(module_name: str, firmware_version: str) -> Dict[str
BULB_JSON_ERROR = b'{"env":"pro","error":{"code":-32700,"message":"Parse error"}}'


class BulbUDPRequestHandlerBase(socketserver.DatagramRequestHandler):
class BulbUDPRequestHandler:
"""Class for UDP handler."""

pilot_state: Dict[str, Any] # Will be set by constructor for the actual class
sys_config: Dict[str, Any] # Will be set by constructor for the actual class
model_config: Dict[str, Any] # Will be set by constructor for the actual class
user_config: Dict[str, Any]
transport: asyncio.DatagramTransport

def handle(self) -> None:
def handle(self, resp: bytes, addr: Tuple[str, int]) -> None:
"""Handle the request."""
data = self.rfile.readline().strip()
data = resp.strip()
print(f"Request:{data!r}")
try:
json_data: Dict[str, Any] = dict(json.loads(data.decode()))
except json.JSONDecodeError:
self.wfile.write(BULB_JSON_ERROR)
self.transport.sendto(BULB_JSON_ERROR, addr)
return

method = str(json_data["method"])
if method == "setPilot":
return_data = self.setPilot(json_data)
self.wfile.write(return_data)
self.transport.sendto(return_data, addr)
elif method == "getPilot":
print(f"Response:{json.dumps(self.pilot_state)!r}")
self.wfile.write(bytes(json.dumps(self.pilot_state), "utf-8"))
self.transport.sendto(bytes(json.dumps(self.pilot_state), "utf-8"), addr)
elif method == "getSystemConfig":
self.wfile.write(bytes(json.dumps(self.sys_config), "utf-8"))
self.transport.sendto(bytes(json.dumps(self.sys_config), "utf-8"), addr)
elif method == "getModelConfig":
self.wfile.write(bytes(json.dumps(self.model_config), "utf-8"))
self.transport.sendto(bytes(json.dumps(self.model_config), "utf-8"), addr)
elif method == "getUserConfig":
self.wfile.write(bytes(json.dumps(self.user_config), "utf-8"))
self.transport.sendto(bytes(json.dumps(self.user_config), "utf-8"), addr)
else:
raise RuntimeError(f"No handler for {method}")

Expand All @@ -423,38 +425,28 @@ def setPilot(self, json_data: Dict[str, Any]) -> bytes:
return b'{"method":"setPilot","env":"pro","result":{"success":true}}'


def make_udp_fake_bulb_server(
async def make_udp_fake_bulb_server(
module_name: str, firmware_version: str
) -> socketserver.ThreadingUDPServer:
) -> Tuple[asyncio.BaseTransport, asyncio.BaseProtocol]:
"""Configure a fake bulb instance."""
pilot_state = get_initial_pilot()
sys_config = get_initial_sys_config(module_name, firmware_version)
model_config = get_initial_model_config(module_name, firmware_version)
user_config = get_initial_user_config(module_name, firmware_version)

BulbUDPRequestHandler = type(
"BulbUDPRequestHandler",
(BulbUDPRequestHandlerBase,),
{
"pilot_state": pilot_state,
"sys_config": sys_config,
"model_config": model_config,
"user_config": user_config,
},
)
handler = BulbUDPRequestHandler()
handler.pilot_state = get_initial_pilot()
handler.sys_config = get_initial_sys_config(module_name, firmware_version)
handler.model_config = get_initial_model_config(module_name, firmware_version)
handler.user_config = get_initial_user_config(module_name, firmware_version)

udp_server = socketserver.ThreadingUDPServer(
server_address=("127.0.0.1", 38899),
RequestHandlerClass=BulbUDPRequestHandler,
transport_proto = await asyncio.get_event_loop().create_datagram_endpoint(
lambda: WizProtocol(on_response=handler.handle),
local_addr=("127.0.0.1", 0),
)
return udp_server
handler.transport = cast(asyncio.DatagramTransport, transport_proto[0])
return transport_proto


def startup_bulb(
async def startup_bulb(
module_name: str = "ESP01_SHRGB_03", firmware_version: str = "1.25.0"
) -> Callable[[], Any]:
) -> Tuple[Callable[[], Any], int]:
"""Start up the bulb. Returns a function to shut it down."""
server = make_udp_fake_bulb_server(module_name, firmware_version)
thread = threading.Thread(target=server.serve_forever)
thread.start()
return server.shutdown
transport_proto = await make_udp_fake_bulb_server(module_name, firmware_version)
transport = cast(asyncio.DatagramTransport, transport_proto[0])
return transport.close, transport.get_extra_info("sockname")[1]
16 changes: 6 additions & 10 deletions pywizlight/tests/test_bulb.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@
from pywizlight.tests.fake_bulb import startup_bulb


@pytest.fixture(scope="module")
def startup_fake_bulb(request: pytest.FixtureRequest) -> None:
shutdown = startup_bulb(module_name="ESP01_SHRGB_03", firmware_version="1.25.0")
request.addfinalizer(shutdown)


@pytest.fixture()
async def correct_bulb(
startup_fake_bulb: pytest.FixtureRequest,
) -> AsyncGenerator[wizlight, None]:
bulb = wizlight(ip="127.0.0.1")
async def correct_bulb() -> AsyncGenerator[wizlight, None]:
shutdown, port = await startup_bulb(
module_name="ESP01_SHRGB_03", firmware_version="1.25.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()


@pytest.fixture()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_dimmable_white.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def dimmable_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP05_SHDW_21", firmware_version="1.25.0")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP05_SHDW_21", firmware_version="1.25.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_invalid_module_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def no_module_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="MISSING", firmware_version="1.16.64")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="MISSING", firmware_version="1.16.64"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
4 changes: 2 additions & 2 deletions pywizlight/tests/test_bulb_light_strip_1_16_64.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

@pytest.fixture()
async def light_strip() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(
shutdown, port = await startup_bulb(
module_name="ESP03_SHRGB3_01ABI", firmware_version="1.16.64"
)
bulb = wizlight(ip="127.0.0.1")
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_light_strip_1_21_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def light_strip() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP20_SHRGB_01ABI", firmware_version="1.21.4")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP20_SHRGB_01ABI", firmware_version="1.21.4"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_light_strip_1_25_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def light_strip() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP20_SHRGB_01ABI", firmware_version="1.25.0")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP20_SHRGB_01ABI", firmware_version="1.25.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_missing_kelvin_range.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def missing_kelvin_range_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="MISSING_KELVIN", firmware_version="1.16.64")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="MISSING_KELVIN", firmware_version="1.16.64"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_no_module_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def invalid_module_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="INVALID", firmware_version="1.16.64")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="INVALID", firmware_version="1.16.64"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_rgbtw_1_21_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def rgbtw_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP20_SHRGBC_01", firmware_version="1.21.4")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP20_SHRGBC_01", firmware_version="1.21.4"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def socket() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP10_SOCKET_06", firmware_version="1.25.0")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP10_SOCKET_06", firmware_version="1.25.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down
6 changes: 4 additions & 2 deletions pywizlight/tests/test_bulb_turnable_white.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

@pytest.fixture()
async def turnable_bulb() -> AsyncGenerator[wizlight, None]:
shutdown = startup_bulb(module_name="ESP21_SHTW_01", firmware_version="1.25.0")
bulb = wizlight(ip="127.0.0.1")
shutdown, port = await startup_bulb(
module_name="ESP21_SHTW_01", firmware_version="1.25.0"
)
bulb = wizlight(ip="127.0.0.1", port=port)
yield bulb
await bulb.async_close()
shutdown()
Expand Down