Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #314 from gnes-ai/feat-sync-dump
Browse files Browse the repository at this point in the history
feat(service): remove async dump for better stability
  • Loading branch information
mergify[bot] authored Oct 10, 2019
2 parents 17f9287 + 199a71a commit 49150fb
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 45 deletions.
3 changes: 2 additions & 1 deletion gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def set_service_parser(parser=None):
parser.add_argument('--timeout', type=int, default=-1,
help='timeout (ms) of all communication, -1 for waiting forever')
parser.add_argument('--dump_interval', type=int, default=5,
help='serialize the service to a file every n seconds, -1 means --read_only')
help='serialize the model in the service every n seconds if model changes. '
'-1 means --read_only. ')
parser.add_argument('--read_only', action='store_true', default=False,
help='do not allow the service to modify the model, '
'dump_interval will be ignored')
Expand Down
11 changes: 7 additions & 4 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ class Flow:
You can use `.add()` then `.build()` to customize your own workflow.
For example:
.. highlight:: python
.. code-block:: python
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Preprocessor, yaml_path='BasePreprocessor')
.add(gfs.Encoder, yaml_path='BaseEncoder')
.add(gfs.Router, yaml_path='BaseRouter'))
with f.build(backend='thread') as flow:
flow.index()
...
Expand All @@ -77,6 +79,7 @@ class Flow:
Note the different default copy behaviors in `.add()` and `.build()`:
`.add()` always copy the flow by default, whereas `.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.
"""
_supported_orch = {'swarm', 'k8s'}
_service2parser = {
Expand Down Expand Up @@ -211,14 +214,14 @@ def add(self, service: 'Service',
**kwargs) -> 'Flow':
"""
Add a service to the current flow object and return the new modified flow object
:param copy_flow: when set to true, then always copy the current flow
and do the modification on top of it then return, otherwise, do in-line modification
:param service: a 'Service' enum, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name indentifier of the service, useful in 'service_in' and 'service_out'
:param service_in: the name of the service(s) that this service receives data from.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param service_out: the name of the service(s) that this service sends data to.
One can also use 'Service.Frontend' to indicate the connection with the frontend.
:param copy_flow: when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
:param kwargs: other keyword-value arguments that the service CLI supports
:return: a (new) flow object with modification
"""
Expand Down
55 changes: 26 additions & 29 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def __init__(self, args):
self.is_event_loop = self._get_event()
self.is_model_changed = self._get_event()
self.is_handler_done = self._get_event()
self.last_dump_time = time.perf_counter()
self._model = None
self.use_event_loop = True
self.ctrl_addr = 'tcp://%s:%d' % (self.default_host, self.args.port_ctrl)
Expand All @@ -335,29 +336,20 @@ def run(self):
except Exception as ex:
self.logger.error(ex, exc_info=True)

def _start_auto_dump(self):
if self.args.dump_interval > 0 and not self.args.read_only:
self._auto_dump_thread = threading.Thread(target=self._auto_dump)
self._auto_dump_thread.setDaemon(1)
self._auto_dump_thread.start()

def _auto_dump(self):
while self.is_event_loop.is_set():
if self.is_model_changed.is_set():
self.is_model_changed.clear()
self.logger.info(
'auto-dumping the new change of the model every %ds...' % self.args.dump_interval)
self.dump()
time.sleep(self.args.dump_interval)

def dump(self):
if not self.args.read_only:
if self._model:
self.logger.info('dumping changes to the model...')
self._model.dump()
self.logger.info('dumping finished!')
else:
self.logger.info('no dumping as "read_only" set to true.')
def dump(self, respect_dump_interval: bool = True):
if (not self.args.read_only
and self.args.dump_interval > 0
and self._model
and self.is_model_changed.is_set()
and (respect_dump_interval
and (time.perf_counter() - self.last_dump_time) > self.args.dump_interval)
or not respect_dump_interval):
self.is_model_changed.clear()
self.logger.info('dumping changes to the model, %3.0fs since last the dump'
% (time.perf_counter() - self.last_dump_time))
self._model.dump()
self.last_dump_time = time.perf_counter()
self.logger.info('dumping finished! next dump will start in at least %3.0fs' % self.args.dump_interval)

@handler.register_hook(hook_type='post')
def _hook_warn_body_type_change(self, msg: 'gnes_pb2.Message', *args, **kwargs):
Expand Down Expand Up @@ -414,17 +406,17 @@ def _run(self, ctx):
self.post_init()
self.is_ready.set()
self.is_event_loop.set()
self._start_auto_dump()
self.logger.critical('ready and listening')
while self.is_event_loop.is_set():
pull_sock = None
socks = dict(poller.poll())
socks = dict(poller.poll(1))
if socks.get(in_sock) == zmq.POLLIN:
pull_sock = in_sock
elif socks.get(ctrl_sock) == zmq.POLLIN:
pull_sock = ctrl_sock
else:
self.logger.error('received message from unknown socket: %s' % socks)
# no message received, pass
continue

if self.use_event_loop or pull_sock == ctrl_sock:
with TimeContext('handling message', self.logger):
self.is_handler_done.clear()
Expand All @@ -450,10 +442,13 @@ def _run(self, ctx):
self.logger.warning(
'received a new message but since "use_event_loop=False" I will not handle it. '
'I will just block the thread until "is_handler_done" is set!')
# wait until some one else call is_handler_done.set()
self.is_handler_done.wait()
# clear the handler status
self.is_handler_done.clear()
if self.args.dump_interval == 0:
self.dump()

# block the event loop if a dump is needed
self.dump()
except EventLoopEnd:
self.logger.info('break from the event loop')
except ComponentNotLoad:
Expand All @@ -466,6 +461,8 @@ def _run(self, ctx):
in_sock.close()
out_sock.close()
ctrl_sock.close()
# do not check dump_interval constraint as the last dump before close
self.dump(respect_dump_interval=False)
self.logger.critical('terminated')

def post_init(self):
Expand Down
5 changes: 0 additions & 5 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ def get_response(num_recv, blocked=False):
zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs)
self.pending_request += 1

num_recv = max(self.pending_request - self.args.max_pending_request, 1)

# switch to blocked recv when too many pending requests
yield from get_response(num_recv, num_recv > 1)

yield from get_response(self.pending_request, blocked=True)

class ZmqContext:
Expand Down
6 changes: 0 additions & 6 deletions tests/test_service_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ def test_external_module(self):

with ServiceManager(RouterService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_override_module(self):
args = set_indexer_parser().parse_args([
Expand All @@ -108,8 +106,6 @@ def test_override_module(self):

with ServiceManager(IndexerService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_override_twice_module(self):
args = set_indexer_parser().parse_args([
Expand All @@ -120,8 +116,6 @@ def test_override_twice_module(self):

with ServiceManager(IndexerService, args):
pass
self.assertTrue(os.path.exists('foo_contrib_encoder.bin'))
os.remove('foo_contrib_encoder.bin')

def test_grpc_with_pub(self):
self._test_grpc_multiple_pub('thread', 1)
Expand Down

0 comments on commit 49150fb

Please sign in to comment.