-
Notifications
You must be signed in to change notification settings - Fork 3
/
testProducerConsumerNotification.py
executable file
·135 lines (121 loc) · 4.59 KB
/
testProducerConsumerNotification.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
'''
Tests a node's response to request for information
about it's producers and consumers.
Note that this doesn't test the the messages that
happen as part of reset.
1) Use IdentifyEvents to get list of event IDs
2) Use IdentifyConsumer/IdentifyProducer to make sure each is available
@author: Bob Jacobsen
'''
import connection as connection
import canolcbutils
import identifyEventsAddressed
import identifyConsumers
import identifyProducers
def usage() :
print ""
print "Called standalone, will send one CAN IdentifyConsumers message"
print " and display response"
print ""
print "Expect zero or more ConsumerIdentified reply in return"
print "e.g. [1926Bsss] nn nn nn nn nn nn"
print "containing dest alias and EventID"
print ""
print "Default connection detail taken from connection.py"
print ""
print "-a --alias source alias (default 123)"
print "-d --dest dest alias (default 123)"
print "-t find destination alias automatically"
print "-v verbose"
print "-V very verbose"
import getopt, sys
def main():
alias = connection.thisNodeAlias;
dest = connection.testNodeAlias;
verbose = False
identifynode = False
# argument processing
try:
opts, remainder = getopt.getopt(sys.argv[1:], "d:a:vVt", ["alias=", "dest="])
except getopt.GetoptError, err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for opt, arg in opts:
if opt == "-v":
verbose = True
elif opt == "-V":
verbose = True
connection.network.verbose = True
elif opt in ("-a", "--alias"):
alias = int(arg) # needs hex decode
elif opt in ("-d", "--dest"):
dest = int(arg) # needs hex decode
elif opt == "-t":
identifynode = True
else:
assert False, "unhandled option"
if identifynode :
import getUnderTestAlias
dest,nodeID = getUnderTestAlias.get(alias, None, verbose)
retval = test(alias, dest, connection, verbose)
connection.network.close()
return retval
def test(alias, dest, connection, verbose) :
# now execute
connection.network.send(identifyEventsAddressed.makeframe(alias, dest))
consumed = []
produced = []
while (True) :
reply = connection.network.receive()
if (reply == None ) : break
if ( reply.startswith(":X194C7") or reply.startswith(":X194C4") or reply.startswith(":X194C5") ):
event = canolcbutils.bodyArray(reply)
if verbose : print " consumes ", event
consumed = consumed+[event]
elif ( reply.startswith(":X19547") or reply.startswith(":X19544") or reply.startswith(":X19545") ):
event = canolcbutils.bodyArray(reply)
if verbose : print " produces ", event
produced = produced+[event]
# now check consumers and producers individually
timeout = connection.network.timeout
connection.network.timeout = 0.25
if connection.network.verbose : print "Start individual checks"
for c in consumed :
connection.network.send(identifyConsumers.makeframe(alias, c))
reply = connection.network.receive()
if (reply == None ) :
print "no reply for consumer ", c
return 20
elif not ( reply.startswith(":X194C7") or reply.startswith(":X194C4") or reply.startswith(":X194C5") ):
print "Unexpected reply "+reply
return 21
# here is OK, go around to next
while True :
reply = connection.network.receive()
if (reply == None ) : break
elif ( not reply.startswith(":X194C7") ) :
print "Unexpected reply "+reply
return 22
for p in produced :
connection.network.send(identifyProducers.makeframe(alias, p))
reply = connection.network.receive()
if (reply == None ) :
print "no reply for producer ", p
return 30
elif not ( reply.startswith(":X19547") or reply.startswith(":X19544") or reply.startswith(":X19545") ):
print "Unexpected reply "+reply
return 31
# here is OK, go around to next
while True :
reply = connection.network.receive()
if (reply == None ) : break
elif ( not reply.startswith(":X19547") ) :
print "Unexpected reply "+reply
return 32
connection.network.timeout = timeout
return 0
if __name__ == '__main__':
main()