Skip to content

Commit

Permalink
telemetry: improve handling of EOF
Browse files Browse the repository at this point in the history
Also lazy allocate reqobj dict, to avoid doing per-object work on stuff
that never requests.
  • Loading branch information
mlyle committed May 17, 2018
1 parent a00b0c7 commit 0553c39
Showing 1 changed file with 49 additions and 74 deletions.
123 changes: 49 additions & 74 deletions python/dronin/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ def __init__(self, githash=None, service_in_iter=True,
self.filename = name

self.acks = set()
self.req_obj = {}

self.eof = False
self.req_obj = None

self.file_id = None

Expand All @@ -117,10 +115,11 @@ def gotnack_callback(self, obj):
# TODO: Need to handle instance id better
key = (obj._id, 0)

request = self.req_obj.pop(key, None)
if self.req_obj is not None:
request = self.req_obj.pop(key, None)

if request is not None:
request.completed(None, obj._id)
if request is not None:
request.completed(None, obj._id)

def as_filtered_list(self, match_class, filter_cond=None, blocks=True):
if isinstance(match_class, str):
Expand Down Expand Up @@ -283,6 +282,9 @@ def _check_resends(self):
# Check for anything that needs retrying--- requests,
# acked operations. Do at most one thing per cycle.

if self.req_obj is None:
return

for f in list(self.req_obj.values()):
if f.expired():
self.req_obj.pop(f.key())
Expand Down Expand Up @@ -313,6 +315,9 @@ def default_cb(val, id_val):
key = req.key()

with self.cond:
if self.req_obj is None:
self.req_obj = {}

old_req = self.req_obj.pop(key, None)

if old_req is not None:
Expand Down Expand Up @@ -418,7 +423,7 @@ def __wait_ack(self, obj, timeout):
if obj in self.acks:
return True

if self.eof:
if self._done():
return False

diff = expiry - time.time();
Expand All @@ -428,26 +433,18 @@ def __wait_ack(self, obj, timeout):

self.ack_cond.wait(diff + 0.001)

def __handle_frames(self, frames):
def __handle_frames(self):
objs = []

if frames is None:
self.eof = True
self._close()
elif frames == b'':
return
else:
self.uavtalk.new_data(frames)

obj = next(self.uavtalk_generator)
obj = next(self.uavtalk_generator, None)

while obj:
if self.do_handshaking:
self.__handle_handshake(obj)
while obj is not None:
if self.do_handshaking:
self.__handle_handshake(obj)

objs.append(obj)
objs.append(obj)

obj = next(self.uavtalk_generator)
obj = next(self.uavtalk_generator, None)

# Only traverse the lock when we've processed everything in this
# batch.
Expand All @@ -459,15 +456,19 @@ def __handle_frames(self, frames):
for obj in objs:
self.last_values[obj.__class__]=obj

key = (obj._id, obj.get_inst_id())
if self.req_obj is not None:
key = (obj._id, obj.get_inst_id())

request = self.req_obj.pop(key, None)
request = self.req_obj.pop(key, None)

if request is not None:
request.completed(obj, obj._id)
if request is not None:
request.completed(obj, obj._id)

self.cond.notifyAll()

if self.uavtalk.is_eof():
self._close()

def get_last_values(self):
""" Returns the last instance of each kind of object received. """
with self.cond:
Expand Down Expand Up @@ -506,7 +507,7 @@ def run():
with self.ack_cond:
self.cond.notifyAll()
self.ack_cond.notifyAll()
self.eof = True
self.uavtalk.set_eof()
self._close()

t = Thread(target=run, name="telemetry svc thread")
Expand Down Expand Up @@ -541,20 +542,20 @@ def service_connection(self, timeout=None):
if (finish_time is not None) and (expire > finish_time):
expire = finish_time

data = self._receive(expire)
self.__handle_frames(data)
did_stuff = self._do_io(expire)
self.__handle_frames()

if self.eof:
if did_stuff:
break

if len(data):
if self.uavtalk.is_eof():
break

if (finish_time is not None) and (time.time() >= finish_time):
break

@abstractmethod
def _receive(self, finish_time):
def _do_io(self, finish_time):
return

# No implementation required, so not abstract
Expand All @@ -563,7 +564,7 @@ def _send(self, msg):

def _done(self):
with self.cond:
return self.eof
return self.uavtalk.is_eof()

def _close(self):
return
Expand All @@ -584,37 +585,10 @@ def __init__(self, *args, **kwargs):
TelemetryBase.__init__(self, do_handshaking=True,
gcs_timestamps=False, *args, **kwargs)

self.recv_buf = b''
self.send_buf = b''

self.send_lock = Condition()

def _receive(self, finish_time):
""" Fetch available data from file descriptor. """

if self.recv_buf is None:
return None

# Always do some minimal IO if possible
self._do_io(0)

if self.recv_buf is None:
return None

if len(self.recv_buf) < 1:
self._do_io(finish_time)

if self.recv_buf is None:
return None

if len(self.recv_buf) < 1:
return b''

ret = self.recv_buf
self.recv_buf = b''

return ret

def _send(self, msg):
""" Send a string to the controller """

Expand Down Expand Up @@ -664,10 +638,7 @@ def _do_io(self, finish_time):

did_stuff = False

if self.recv_buf is None:
return False

if len(self.recv_buf) < 1024:
if self.uavtalk.want_more_data():
rd_set.append(self.fd)
elif len(self.send_buf) == 0:
# If we don't want I/O, return quick!
Expand All @@ -690,10 +661,9 @@ def _do_io(self, finish_time):
try:
chunk = os.read(self.fd, 1024)
if chunk == b'':
if self.recv_buf == b'':
self.recv_buf = None
self.uavtalk.set_eof()
else:
self.recv_buf = self.recv_buf + chunk
self.uavtalk.new_data(chunk)

did_stuff = True
except OSError as err:
Expand Down Expand Up @@ -814,7 +784,7 @@ def _do_io(self, finish_time):

if chunk != b'':
did_stuff = True
self.recv_buf = self.recv_buf + chunk
self.uavtalk.new_data(chunk)
except serial.serialutil.SerialException:
# Ignore this; looks like a pyserial bug
pass
Expand Down Expand Up @@ -938,7 +908,7 @@ def _do_io(self, finish_time):

if chunk != b'':
did_stuff = True
self.recv_buf = self.recv_buf + chunk
self.uavtalk.new_data(chunk)

with self.send_lock:
if self.send_buf != b'':
Expand Down Expand Up @@ -1042,15 +1012,20 @@ def __init__(self, file_obj, parse_header=False,

self.done=False

def _receive(self, finish_time):
def _do_io(self, finish_time):
""" Fetch available data from file """

buf = self.f.read(524288) # 512k
if self.uavtalk.want_more_data():
buf = self.f.read(524288) # 512k

if buf == b'':
return None
if buf == b'':
self.uavtalk.set_eof()
else:
self.uavtalk.new_data(buf)

return True

return buf
return False

def _finish_telemetry_args(parser, args, service_in_iter, iter_blocks):
parse_header = False
Expand Down

0 comments on commit 0553c39

Please sign in to comment.