-
Notifications
You must be signed in to change notification settings - Fork 0
/
entrypoint.py
47 lines (38 loc) · 1002 Bytes
/
entrypoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""API entrypoint."""
import asyncio
import os
from pyblinky import AsyncWemo
import uvicorn
from src import config
async def main():
"""App entrypoint."""
auto_reload = os.environ.get('AUTO_RELOAD', '').lower() == 'true'
num_workers = int(os.environ.get('NUM_WORKERS', 3))
# determine active plugs before worker forking
plugs = [
AsyncWemo(ip)
for ip in config.PLUG_IPS
]
results = await asyncio.gather(
*[
plug.identify()
for plug in plugs
],
return_exceptions=True
)
active_plug_ips = [
plug.ip
for plug, result in zip(plugs, results)
if not isinstance(result, Exception)
]
os.environ['ACTIVE_PLUG_IPS'] = ','.join(active_plug_ips)
uvicorn.run(
'src.app:app',
host='0.0.0.0',
port=5000,
reload=auto_reload,
workers=num_workers,
log_config='log.ini'
)
if __name__ == '__main__':
asyncio.run(main())