This repository has been archived by the owner on Jun 7, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmanual_test.py
77 lines (64 loc) · 1.68 KB
/
manual_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from argparse import ArgumentParser
from asyncio import get_event_loop
from functools import partial
from logging.config import dictConfig
from sys import stdout
from rfxcom import protocol
from rfxcom.transport import AsyncioTransport
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)-8s %(name)-35s %(message)s'
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'stream': stdout,
'formatter': 'standard'
},
'file': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'standard',
'filename': '/tmp/rfxcom.log',
'maxBytes': 10 * 1024 * 1024,
},
},
'loggers': {
'rfxcom': {
'handlers': ['console', 'file', ],
'propagate': True,
'level': 'DEBUG',
}
},
}
def printer(packet):
print(packet)
def write(rfxcom):
return
def main():
dictConfig(LOGGING)
loop = get_event_loop()
parser = ArgumentParser()
parser.add_argument('device')
args = parser.parse_args()
try:
rfxcom = AsyncioTransport(args.device, loop, callbacks={
protocol.Status: printer,
protocol.Elec: printer,
protocol.TempHumidity: printer,
'*': printer,
})
loop.call_later(2, partial(write, rfxcom))
loop.run_forever()
finally:
loop.close()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass