From 42f992675868ed6ea3790f98405e7877d1692d1f Mon Sep 17 00:00:00 2001 From: Jem Date: Wed, 9 Oct 2019 22:37:20 +0800 Subject: [PATCH 1/2] fix(client): fix bugs for client --- gnes/client/base.py | 8 +++++--- gnes/client/stream.py | 8 ++++---- gnes/preprocessor/video/frame_select.py | 1 + gnes/service/frontend.py | 3 +++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/gnes/client/base.py b/gnes/client/base.py index ce4bbd66..61d45bb3 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -63,9 +63,11 @@ def get_default_fn(r_type): fn = self.routes.get(resp_type) else: fn = get_default_fn(type(resp)) - - self.logger.info('handling response with %s' % fn.__name__) - return fn(self._context, resp) + self.logger.info('handling response with %s' % fn.__name__) + return fn(self._context, resp) + else: + self.logger.warning('the received message is not a response') + return None class ZmqClient: diff --git a/gnes/client/stream.py b/gnes/client/stream.py index 7adde956..6df6051e 100644 --- a/gnes/client/stream.py +++ b/gnes/client/stream.py @@ -44,7 +44,7 @@ class StreamingClient(GrpcClient): def __init__(self, args): super().__init__(args) - self._request_queue = queue.Queue(maxsize=1000) + self._request_queue = queue.Queue(maxsize=10) self._is_streaming = threading.Event() self._dispatch_thread = threading.Thread(target=self._start) @@ -63,16 +63,16 @@ def _start(self): self._is_streaming.clear() def _request_generator(self): - while self._is_streaming.is_set(): + while True: try: request = self._request_queue.get(block=True, timeout=5.0) if request is None: break yield request except queue.Empty: - continue + break except Exception as e: - print('exception: %s' % str(e)) + self.logger.error('exception: %s' % str(e)) break @handler.register(NotImplementedError) diff --git a/gnes/preprocessor/video/frame_select.py b/gnes/preprocessor/video/frame_select.py index d55aacbb..1e3fe6b6 100644 --- a/gnes/preprocessor/video/frame_select.py +++ b/gnes/preprocessor/video/frame_select.py @@ -46,6 +46,7 @@ def apply(self, doc: 'gnes_pb2.Document') -> None: else: idx = np.sort(np.random.choice(len(images), self.sframes, replace=False)) chunk.blob.CopyFrom(array2blob(images[idx])) + del images else: self.logger.error( 'bad document: "doc.chunks" is empty!') diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index b74c2727..6d62bae0 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -125,6 +125,9 @@ def get_response(num_recv, blocked=False): with self.zmq_context as zmq_client: for request in request_iterator: + num_recv = max(self.pending_request - self.args.max_pending_request, 0) + yield from get_response(num_recv, num_recv > 0) + zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) self.pending_request += 1 From 6f401905943728cdf0b9f206775ef5ee8347e59a Mon Sep 17 00:00:00 2001 From: Jem Date: Wed, 9 Oct 2019 23:29:16 +0800 Subject: [PATCH 2/2] fix(client): fix bugs for client --- gnes/client/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gnes/client/base.py b/gnes/client/base.py index 61d45bb3..aaaa6862 100644 --- a/gnes/client/base.py +++ b/gnes/client/base.py @@ -66,7 +66,7 @@ def get_default_fn(r_type): self.logger.info('handling response with %s' % fn.__name__) return fn(self._context, resp) else: - self.logger.warning('the received message is not a response') + self.logger.warning('the received message is not response') return None