diff --git a/python/dronin/telemetry.py b/python/dronin/telemetry.py index 8cb019a7a9..eaa2cdedba 100644 --- a/python/dronin/telemetry.py +++ b/python/dronin/telemetry.py @@ -89,9 +89,7 @@ def __init__(self, githash=None, service_in_iter=True, self.filename = name self.acks = set() - self.req_obj = {} - - self.eof = False + self.req_obj = None self.file_id = None @@ -117,10 +115,11 @@ def gotnack_callback(self, obj): # TODO: Need to handle instance id better key = (obj._id, 0) - request = self.req_obj.pop(key, None) + if self.req_obj is not None: + request = self.req_obj.pop(key, None) - if request is not None: - request.completed(None, obj._id) + if request is not None: + request.completed(None, obj._id) def as_filtered_list(self, match_class, filter_cond=None, blocks=True): if isinstance(match_class, str): @@ -283,6 +282,9 @@ def _check_resends(self): # Check for anything that needs retrying--- requests, # acked operations. Do at most one thing per cycle. + if self.req_obj is None: + return + for f in list(self.req_obj.values()): if f.expired(): self.req_obj.pop(f.key()) @@ -313,6 +315,9 @@ def default_cb(val, id_val): key = req.key() with self.cond: + if self.req_obj is None: + self.req_obj = {} + old_req = self.req_obj.pop(key, None) if old_req is not None: @@ -418,7 +423,7 @@ def __wait_ack(self, obj, timeout): if obj in self.acks: return True - if self.eof: + if self._done(): return False diff = expiry - time.time(); @@ -428,26 +433,18 @@ def __wait_ack(self, obj, timeout): self.ack_cond.wait(diff + 0.001) - def __handle_frames(self, frames): + def __handle_frames(self): objs = [] - if frames is None: - self.eof = True - self._close() - elif frames == b'': - return - else: - self.uavtalk.new_data(frames) - - obj = next(self.uavtalk_generator) + obj = next(self.uavtalk_generator, None) - while obj: - if self.do_handshaking: - self.__handle_handshake(obj) + while obj is not None: + if self.do_handshaking: + self.__handle_handshake(obj) - objs.append(obj) + objs.append(obj) - obj = next(self.uavtalk_generator) + obj = next(self.uavtalk_generator, None) # Only traverse the lock when we've processed everything in this # batch. @@ -459,15 +456,19 @@ def __handle_frames(self, frames): for obj in objs: self.last_values[obj.__class__]=obj - key = (obj._id, obj.get_inst_id()) + if self.req_obj is not None: + key = (obj._id, obj.get_inst_id()) - request = self.req_obj.pop(key, None) + request = self.req_obj.pop(key, None) - if request is not None: - request.completed(obj, obj._id) + if request is not None: + request.completed(obj, obj._id) self.cond.notifyAll() + if self.uavtalk.is_eof(): + self._close() + def get_last_values(self): """ Returns the last instance of each kind of object received. """ with self.cond: @@ -506,7 +507,7 @@ def run(): with self.ack_cond: self.cond.notifyAll() self.ack_cond.notifyAll() - self.eof = True + self.uavtalk.set_eof() self._close() t = Thread(target=run, name="telemetry svc thread") @@ -541,20 +542,20 @@ def service_connection(self, timeout=None): if (finish_time is not None) and (expire > finish_time): expire = finish_time - data = self._receive(expire) - self.__handle_frames(data) + did_stuff = self._do_io(expire) + self.__handle_frames() - if self.eof: + if did_stuff: break - if len(data): + if self.uavtalk.is_eof(): break if (finish_time is not None) and (time.time() >= finish_time): break @abstractmethod - def _receive(self, finish_time): + def _do_io(self, finish_time): return # No implementation required, so not abstract @@ -563,7 +564,7 @@ def _send(self, msg): def _done(self): with self.cond: - return self.eof + return self.uavtalk.is_eof() def _close(self): return @@ -584,37 +585,10 @@ def __init__(self, *args, **kwargs): TelemetryBase.__init__(self, do_handshaking=True, gcs_timestamps=False, *args, **kwargs) - self.recv_buf = b'' self.send_buf = b'' self.send_lock = Condition() - def _receive(self, finish_time): - """ Fetch available data from file descriptor. """ - - if self.recv_buf is None: - return None - - # Always do some minimal IO if possible - self._do_io(0) - - if self.recv_buf is None: - return None - - if len(self.recv_buf) < 1: - self._do_io(finish_time) - - if self.recv_buf is None: - return None - - if len(self.recv_buf) < 1: - return b'' - - ret = self.recv_buf - self.recv_buf = b'' - - return ret - def _send(self, msg): """ Send a string to the controller """ @@ -664,10 +638,7 @@ def _do_io(self, finish_time): did_stuff = False - if self.recv_buf is None: - return False - - if len(self.recv_buf) < 1024: + if self.uavtalk.want_more_data(): rd_set.append(self.fd) elif len(self.send_buf) == 0: # If we don't want I/O, return quick! @@ -690,10 +661,9 @@ def _do_io(self, finish_time): try: chunk = os.read(self.fd, 1024) if chunk == b'': - if self.recv_buf == b'': - self.recv_buf = None + self.uavtalk.set_eof() else: - self.recv_buf = self.recv_buf + chunk + self.uavtalk.new_data(chunk) did_stuff = True except OSError as err: @@ -814,7 +784,7 @@ def _do_io(self, finish_time): if chunk != b'': did_stuff = True - self.recv_buf = self.recv_buf + chunk + self.uavtalk.new_data(chunk) except serial.serialutil.SerialException: # Ignore this; looks like a pyserial bug pass @@ -938,7 +908,7 @@ def _do_io(self, finish_time): if chunk != b'': did_stuff = True - self.recv_buf = self.recv_buf + chunk + self.uavtalk.new_data(chunk) with self.send_lock: if self.send_buf != b'': @@ -1042,15 +1012,20 @@ def __init__(self, file_obj, parse_header=False, self.done=False - def _receive(self, finish_time): + def _do_io(self, finish_time): """ Fetch available data from file """ - buf = self.f.read(524288) # 512k + if self.uavtalk.want_more_data(): + buf = self.f.read(524288) # 512k - if buf == b'': - return None + if buf == b'': + self.uavtalk.set_eof() + else: + self.uavtalk.new_data(buf) + + return True - return buf + return False def _finish_telemetry_args(parser, args, service_in_iter, iter_blocks): parse_header = False