-
Notifications
You must be signed in to change notification settings - Fork 10
/
sec_payload_tx_example.py
144 lines (102 loc) · 4.8 KB
/
sec_payload_tx_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
#
# Example generation of some basic 'Secondary Payload' packets.
# This is a method by which another process or payload can inject data into
# the Wenet downlink telemetry channel.
#
# Refer to the equivalent sec_payload_rx_example in wenet/rx/
# for how to receive these packets on the ground-station end.
#
# Copyright (C) 2018 Mark Jessop <[email protected]>
# Released under GNU GPL v3 or later
#
import socket, struct, time, json, random
def emit_secondary_packet(id=0, packet="", repeats = 1, hostname='<broadcast>', port=55674):
""" Send a Secondary Payload data packet into the network, to (hopefully) be
transmitted by a Wenet transmitter.
Keyword Arguments:
id (int): Payload ID number, 0 - 255
packet (): Packet data, packed as a byte array. Maximum of 254 bytes in size.
repeats (int): Number of times to re-transmit this packet. Defaults to 1.
hostname (str): Hostname of the Wenet transmitter. Defaults to using UDP broadcast.
port (int): UDP port of the Wenet transmitter. Defaults to 55674.
"""
telemetry_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# Set up the telemetry socket so it can be re-used.
telemetry_socket.setsockopt(socket.SOL_SOCKET,socket.SO_BROADCAST,1)
telemetry_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# We need the following if running on OSX.
try:
telemetry_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except:
pass
# Place data into dictionary.
data = {'type': 'WENET_TX_SEC_PAYLOAD', 'id': int(id), 'repeats': int(repeats), 'packet': list(bytearray(packet))}
# Send to target hostname. If this fails just send to localhost.
try:
telemetry_socket.sendto(json.dumps(data).encode(), (hostname, port))
except socket.error:
telemetry_socket.sendto(json.dumps(data).encode(), ('127.0.0.1', port))
telemetry_socket.close()
# Global text message counter.
text_message_counter = 0
def create_text_message(message):
""" Create a text message packet, for transmission within a 'secondary payload' message.
This is in the same format as a standard wenet text message, however the maximum message
length is shortened by 2 bytes to 250 bytes, due to the extra header overhead.
Keyword Arguments:
message(str): A text message as a string, up to 250 characters in length.
"""
global text_message_counter
text_message_counter = (text_message_counter+1)%65536
# Clip message if required.
if len(message) > 250:
message = message[:250]
# We will use the Wenet standard text message format, which has a packet type of 0x00,
# and consists of a length field, a message count, and then the message itself.
_PACKET_TYPE = 0x00
_PACKET_LEN = len(message)
_PACKET_COUNTER = text_message_counter
# Assemble the packet.
_packet = struct.pack(">BBH", _PACKET_TYPE, _PACKET_LEN, _PACKET_COUNTER) + message.encode('ascii')
return _packet
def create_arbitrary_float_packet(data=[0.0, 0.1]):
""" Create a payload that contains a list of floating point numbers.
Keyword Arguments:
data (list): A list of floating point numbers to package and send.
"""
# Clip the amount of numbers to send to a maximum of 63 (we only have 252 bytes to fit data into)
if len(data) > 63:
data = data[:63]
# Our packet format will consist of a packet type, a length field, and then the data.
_PACKET_TYPE = 0x10 # We will define a packet type of 0x10 to be a list of floats.
_PACKET_LEN = len(data)
# Convert the list of floats into a byte array representation.
_float_bytes = b""
for _val in data:
_float_bytes += struct.pack(">f", _val)
# Now assemble the final packet.
_packet = struct.pack(">BB", _PACKET_TYPE, _PACKET_LEN) + _float_bytes
return _packet
if __name__ == "__main__":
# Define ourselves to be 'sub-payload' number 3.
PAYLOAD_ID = 3
try:
while True:
# Create and transmit a text message packet
_txt_packet = create_text_message("This is a test from payload %d" % PAYLOAD_ID)
emit_secondary_packet(id=PAYLOAD_ID, packet=_txt_packet)
print("Sent Text Message.")
time.sleep(1)
# Generate some random numbers and send them.
_len = random.randint(1,63)
_data = []
for i in range(_len):
_data.append(random.random())
_float_packet = create_arbitrary_float_packet(_data)
emit_secondary_packet(id=PAYLOAD_ID, packet=_float_packet)
print("Sent list of floats: " + str(_data))
time.sleep(1)
# Keep going unless we get a Ctrl + C event
except KeyboardInterrupt:
print("Closing")