forked from martinspinler/osclive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SLRemote.py
187 lines (159 loc) · 6.46 KB
/
SLRemote.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import threading
import struct
from socket import *
class SLRemote(object):
# SL Remote server
# Code is based on implementation of vsl1818
def __init__(self, magic, debug = False):
self.magic = magic
self.debug = debug
self.loaded = False
self.killed = False
self.connection = None
self.levels = {}
self.update_thread = None
self.channel_names = {}
self.update_callbacks = []
self.level_callbacks = []
self.mapchannel = {}
self.mapcontrol = {}
self.revchannel = {}
self.revcontrol = {}
# channel_id -> control_id -> value
self.channels = {}
self.channel_max = 0
# Low-level send & receive functions
def parsestr(self, s, n=None):
assert b'\x00' in s
if n is not None:
assert len(s) == n
return (s[:s.find(b'\x00')]).decode()
def recv(self, n):
s = b""
def remaining():
return n-len(s)
while remaining() > 0:
s += self.connection.recv(remaining())
return s
def readmsg(self):
signature, l = struct.unpack("II", self.recv(8))
assert signature == 0xaa550011
return self.recv(l)
def sendmsg(self, message):
self.connection.send(struct.pack("II", 0xaa550011, len(message)) + message)
# Main functions
def connect(self, host, port, name="SL-Remote", ident="1BE8DC6BF62EA577B"):
if not host:
self.loaded = True
return
assert not self.connection
self.connection = socket(AF_INET, SOCK_STREAM)
self.connection.connect((host, port))
self.sendmsg(struct.pack("IIHH32s32s", self.magic[0], self.magic[1], 3, self.magic[2], ident.encode(), name.encode()))
print("SLRemote: starting update thread")
self.update_thread = threading.Thread(target=self.update_process)
self.update_thread.start()
def update(self, message_header, message_body):
unknown1, unknown2, category, unknown3 = struct.unpack("IIHH", message_header)
assert unknown1 == self.magic[0]
assert unknown2 == self.magic[1]
assert unknown3 == self.magic[2]
if category == 5:
self.loaded = True
levels = struct.unpack("128B", message_body)
#print(levels)
for i in range(0, 8):
self.levels["in%d,0" % i] = levels[i]
for i in range(0, 4):
self.levels["in%d,0" % (i+8)] = levels[i*2 + 8]
# Master
self.levels["in16,0"] = levels[20]
#i = 0
#for channel in self.revchannel:
# self.levels[channel] = levels[i]
# i += 1
for callback in self.level_callbacks:
callback()
elif category == 4:
unknown4, channel_id, channel_name = struct.unpack("HH48s", message_body)
channel_name = self.parsestr(channel_name)
assert unknown4 == 0
self.channel_names[channel_id] = channel_name
if(self.debug):
print("SLRemote: recv channel ID %02d name: '%s'" % (channel_id, self.channel_names[channel_id]))
elif category == 2:
control_id, value, channel_id = struct.unpack("=Hd32s", message_body)
channel_id = self.parsestr(channel_id)
if(channel_id not in self.revchannel):
print("SLRemote: recv control update for unknown channel %s" % channel_id)
return
channel = self.revchannel[channel_id]
if(control_id not in self.revcontrol[channel]):
print("SLRemote: recv unknown control update %d for channel %s with value %.2f" % (control_id, channel, value))
return
control = self.revcontrol[channel][control_id]
self.channels[channel][control] = value
if(self.debug):
print("SLRemote: recv control update for %s: %s = %.2f" % (channel, control, value))
for callback in self.update_callbacks:
callback(channel, control, value)
elif category == 7:
# Connected clients?
pass
elif category == 13:
# Number of presets from the disk?
pass
elif category == 14:
# Presets from the disk
pass
elif category == 17:
# Unknown
pass
else:
if(self.debug):
print("SLRemote: recv unknown category %s, message size %d" % (category, len(message_body)))
#print(message_body)
pass
def update_process(self):
assert self.connection
while not self.killed:
message = self.readmsg()
self.update(message[:12], message[12:])
def add_update_callback(self, callback):
self.update_callbacks.append(callback)
def rename_channel(self, old, new):
assert old in self.mapchannel
assert new not in self.mapchannel
rev = self.mapchannel[old]
self.revchannel[rev] = new
self.channels[new] = self.channels.pop(old)
self.revcontrol[new] = self.revcontrol.pop(old)
self.mapcontrol[new] = self.mapcontrol.pop(old)
self.mapchannel[new] = rev
self.mapchannel.pop(old)
def get_level(self, channel):
assert channel in self.mapchannel
return self.levels[self.mapchannel[channel]]
def get_control(self, channel, control):
if(self.debug):
print("SLRemote: Get control %s on channel %s" % (control, channel))
assert channel in self.mapchannel
assert control in self.channels[channel]
return self.channels[channel][control]
def set_control(self, channel, control, value):
if(self.debug):
print("SLRemote: Set control %s on channel %s" % (control, channel))
assert channel in self.mapchannel
assert control in self.channels[channel]
if value > 1:
value = 1
if value < 0:
value = 0
self.channels[channel][control] = value
if not self.connection:
for callback in self.update_callbacks:
callback(channel, control, value)
return
header = struct.pack("IIHH", self.magic[0], self.magic[1], 2, self.magic[2])
body = struct.pack("=Hd32s", self.mapcontrol[channel][control], value, self.mapchannel[channel].encode())
self.sendmsg(header + body)