-
Notifications
You must be signed in to change notification settings - Fork 0
/
zmq_receiver.py
207 lines (171 loc) · 7.42 KB
/
zmq_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""
File: zmq_receiver.py
This is ZMQ receiver designed specifically to request/get and parse messages to/from Pixelator STXM controller.
Author: Igor Beinik
Date: 2024-05-21
"""
import zmq
import json
from pprint import pprint, pformat
from datetime import datetime
from PyQt5.QtCore import QThread
class ZMQReceiver(QThread):
def __init__(self, main_json_identifiers, url_receiv, url_request, timeout=500, debug=False, parent=None):
super().__init__(parent)
self.main_json_identifiers = main_json_identifiers
self.url_receiv = url_receiv
self.url_request = url_request
self.running = False
self.init_response = None
self.init_response_dict = {}
self.timeout = timeout
self.debug = debug
self.pix_ctrl_status = {}
self.positioner_def = {}
self.detector_def = {}
self.oscilloscope_def = {}
self.zone_plate_def = {}
self.remote_file_system_nfo = {}
# Subscribing to the zmq stream from Pixelator
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect(self.url_receiv)
self.socket.setsockopt_string(zmq.SUBSCRIBE, '')
## Set up ZMQ request socket
self.REQsocket = self.context.socket(zmq.REQ)
self.REQsocket.connect(self.url_request)
self.REQsocket.setsockopt(zmq.LINGER, 0)
self.parsed_data = {}
self.parsed_data_dict = {}
self.current_identifier = None
self.current_json_data = []
self.get_init_pixelator()
def parse_binary_message(self, binary_msg):
if binary_msg in self.main_json_identifiers:
return True, binary_msg # This message is an identifier
else:
try:
return False, json.loads(binary_msg.decode('utf-8'))
except json.JSONDecodeError:
return False, binary_msg # Return the message as-is if not JSON
def run(self):
try:
while self.running:
try:
binary_message = self.socket.recv()
if self.debug:
print("New message! Binary message is: ", binary_message)
is_identifier, data = self.parse_binary_message(binary_message)
if is_identifier:
# Handle identifier message
if self.current_identifier is not None:
self.parsed_data[self.current_identifier.decode('utf-8')] = self.current_json_data
self.current_identifier = data
self.current_json_data = []
else:
# Handle value message
if self.current_identifier is not None:
self.current_json_data.append(data)
else:
if self.debug:
print("WARNING: Value message received without an identifier. Ignoring.")
self.parsed_data_dict = self._convert_to_dict([self.parsed_data])[0]
if self.debug:
print("Final Parsed Data: ")
for key, value in self.parsed_data.items():
print(f"{key}: {value}")
except Exception as e:
if self.debug:
print(f"Error occurred: {e}")
# TO-DO Implement automatic recovery logic here (e.g., reconnect)
# One can use try-except blocks or external methods to initiate reconnection
except KeyboardInterrupt:
print("Interrupted by the user, stopping...")
finally:
# Ensure last values are stored
if self.current_identifier is not None:
self.parsed_data[self.current_identifier.decode('utf-8')] = self.current_json_data
self.socket.close()
self.context.term()
if self.debug:
print(f"Final Parsed Data: {self.parsed_data}")
def start_receiver(self):
self.running = True
self.start()
def stop_receiver(self):
self.running = False
self.wait()
def clear_parsed_data(self):
self.parsed_data = {}
def zmq_request(self, command):
"""
This function sends a command through the specified ZMQ request port
and returns the response from the ZMQ server
"""
def isListOfStrings(data):
if type(data) != list:
return False
for d in data:
if type(d) != str: ## Python 3 str = unicode
return False
return True
# check data
if not isListOfStrings(command):
raise Exception("ERROR >> zmq_request needs a list of strings (use json.dumps if you have a dictionary)")
# something to send?
if len(command) == 0: # nothing to send
print("WARNING >> zmq_request called without data")
return ''
try:
# send all but last part
for i in range(len(command)-1):
self.REQsocket.send_string(command[i], flags=zmq.SNDMORE)
# send last part
self.REQsocket.send_string(command[-1])
except zmq.error.ZMQError:
self.zmqREQconnect()
response = None
if (self.REQsocket.poll(self.timeout) & zmq.POLLIN) != 0:
response = [json.loads(x.decode()) for x in self.REQsocket.recv_multipart(zmq.NOBLOCK)]
self.REQ_response = True
self.time_last_message = datetime.now()
if not (type(response) is list and response[0] == {'status':'ok'}): #responds with error message
print(f"ZMQ ERROR >> {response[0]['message']}")
else: #when no response at all
self.REQ_response = False
return response
def get_init_pixelator(self):
'''Request initial data from the Pixelator server.'''
self.init_response = self.zmq_request(['initialize'])
self.init_response_dict = self._convert_to_dict(self.init_response)
self.pix_ctrl_status = self.init_response_dict[0]
self.positioner_def = self.init_response_dict[1]
self.detector_def = self.init_response_dict[2]
self.oscilloscope_def = self.init_response_dict[3]
self.zone_plate_def = self.init_response_dict[4]
self.remote_file_system_nfo = self.init_response_dict[5]
formatted_output = pformat(self.positioner_def)
# Write the string to a file
with open('output.txt', 'w') as f:
f.write(formatted_output)
pprint(self.positioner_def)
def _convert_to_dict(self, lst):
result = {}
for i, item in enumerate(lst):
if isinstance(item, list):
result[i] = self._convert_to_dict(item)
elif isinstance(item, dict):
result[i] = {k: self._convert_to_dict(v) if isinstance(v, list) else v for k, v in item.items()}
else:
result[i] = item
return result
def set_debug(self, debug_enabled):
self.debug = debug_enabled
# Example usage:
# This part can be included in the main script where the module is used
# receiver = ZMQReceiver(
# main_json_identifiers=[b'positionerDefinition', ...],
# url="tcp://b-softimax-sophie-cc-0:55561",
# debug=True
# )
# receiver.start_receiver()