Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #337 from gnes-ai/fix-flow-5
Browse files Browse the repository at this point in the history
fix(flow): use recommend flow api to reduce confusion
  • Loading branch information
mergify[bot] authored Oct 16, 2019
2 parents a419c34 + 9331ef5 commit ca9ad83
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 83 deletions.
15 changes: 11 additions & 4 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class Flow(TrainableBase):
.. highlight:: python
.. code-block:: python
from gnes.flow import Flow, Service as gfs
from gnes.flow import Flow
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, yaml_path='BasePreprocessor')
.add(gfs.Encoder, yaml_path='BaseEncoder')
.add(gfs.Router, yaml_path='BaseRouter'))
.add_preprocessor(yaml_path='BasePreprocessor')
.add_encoder(yaml_path='BaseEncoder')
.add_router(yaml_path='BaseRouter'))
with f.build(backend='thread') as flow:
flow.index()
Expand All @@ -40,6 +40,9 @@ class Flow(TrainableBase):
"""

# a shortcut to the service frontend, removing one extra import
Frontend = Service.Frontend

def __init__(self, with_frontend: bool = True, is_trained: bool = True, *args, **kwargs):
"""
Create a new Flow object.
Expand Down Expand Up @@ -506,6 +509,10 @@ def add(self, service: Union['Service', str],
Add a service to the current flow object and return the new modified flow object.
The attribute of the service can be later changed with :py:meth:`set` or deleted with :py:meth:`remove`
Note there are shortcut versions of this method.
Recommend to use :py:meth:`add_encoder`, :py:meth:`add_preprocessor`,
:py:meth:`add_router`, :py:meth:`add_indexer` whenever possible.
:param service: a 'Service' enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name identifier of the service, can be used in 'recv_from',
'send_to', :py:meth:`set` and :py:meth:`remove`.
Expand Down
36 changes: 36 additions & 0 deletions gnes/flow/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from . import Flow


class BaseIndexFlow(Flow):
"""
BaseIndexFlow defines a common service pipeline when indexing.
It can not be directly used as all services are using the base module by default.
You have to use :py:meth:`set` to change the `yaml_path` of each service.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
(self.add_preprocessor(name='prep', yaml_path='BasePreprocessor', copy_flow=False)
.add_encoder(name='enc', yaml_path='BaseEncoder', copy_flow=False)
.add_indexer(name='vec_idx', yaml_path='BaseIndexer', copy_flow=False)
.add_indexer(name='doc_idx', yaml_path='BaseIndexer', recv_from='prep', copy_flow=False)
.add_router(name='sync', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'], copy_flow=False))


class BaseQueryFlow(Flow):
"""
BaseIndexFlow defines a common service pipeline when indexing.
It can not be directly used as all services are using the base module by default.
You have to use :py:meth:`set` to change the `yaml_path` of each service.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
(self.add_preprocessor(name='prep', yaml_path='BasePreprocessor', copy_flow=False)
.add_encoder(name='enc', yaml_path='BaseEncoder', copy_flow=False)
.add_indexer(name='vec_idx', yaml_path='BaseIndexer', copy_flow=False)
.add_router(name='scorer', yaml_path='Chunk2DocTopkReducer', copy_flow=False)
.add_indexer(name='doc_idx', yaml_path='BaseIndexer', copy_flow=False))
19 changes: 0 additions & 19 deletions gnes/flow/common.py

This file was deleted.

120 changes: 60 additions & 60 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import unittest

from gnes.cli.parser import set_client_cli_parser
from gnes.flow import Flow, Service as gfs, FlowBuildLevelMismatch
from gnes.flow.common import BaseIndexFlow, BaseQueryFlow
from gnes.flow import Flow, FlowBuildLevelMismatch
from gnes.flow.base import BaseIndexFlow, BaseQueryFlow


class TestGNESFlow(unittest.TestCase):
Expand Down Expand Up @@ -38,15 +38,15 @@ def tearDown(self):

def test_flow1(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter'))
g = f.add(gfs.Router, yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter'))
g = f.add_router(yaml_path='BaseRouter')

print('f: %r g: %r' % (f, g))
g.build()
print(g.to_mermaid())

f = f.add(gfs.Router, yaml_path='BaseRouter')
g = g.add(gfs.Router, yaml_path='BaseRouter')
f = f.add_router(yaml_path='BaseRouter')
g = g.add_router(yaml_path='BaseRouter')

print('f: %r g: %r' % (f, g))
f.build()
Expand All @@ -55,76 +55,76 @@ def test_flow1(self):

def test_flow1_ctx_empty(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter'))
.add_router(yaml_path='BaseRouter'))
with f(backend='process'):
pass

def test_flow1_ctx(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Router, yaml_path='BaseRouter'))
.add_router(yaml_path='BaseRouter'))
with flow(backend='process', copy_flow=True) as f, open(self.test_file) as fp:
f.index(txt_file=self.test_file, batch_size=4)
f.train(txt_file=self.test_file, batch_size=4)

with flow(backend='process', copy_flow=True) as f:
# change the flow inside build shall fail
f = f.add(gfs.Router, yaml_path='BaseRouter')
f = f.add_router(yaml_path='BaseRouter')
self.assertRaises(FlowBuildLevelMismatch, f.index, txt_file=self.test_file, batch_size=4)

print(flow.build(backend=None).to_mermaid())

def test_flow2(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow3(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, name='r0', send_to=gfs.Frontend, yaml_path='BaseRouter')
.add(gfs.Router, name='r1', recv_from=gfs.Frontend, yaml_path='BaseRouter')
.add_router(name='r0', send_to=Flow.Frontend, yaml_path='BaseRouter')
.add_router(name='r1', recv_from=Flow.Frontend, yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow4(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, name='r0', yaml_path='BaseRouter')
.add(gfs.Router, name='r1', recv_from=gfs.Frontend, yaml_path='BaseRouter')
.add(gfs.Router, name='reduce', recv_from=['r0', 'r1'], yaml_path='BaseRouter')
.add_router(name='r0', yaml_path='BaseRouter')
.add_router(name='r1', recv_from=Flow.Frontend, yaml_path='BaseRouter')
.add_router(name='reduce', recv_from=['r0', 'r1'], yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow5(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path='PyTorchTransformers')
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer')
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep')
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path='PyTorchTransformers')
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer')
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep')
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())
# f.to_jpg()

def test_flow_replica_pot(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add_encoder(yaml_path='PyTorchTransformers', replicas=3)
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))
print(f.to_mermaid())
print(f.to_url(left_right=False))
Expand All @@ -135,13 +135,13 @@ def _test_index_flow(self, backend):
self.assertFalse(os.path.exists(k))

flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
recv_from='prep')
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx']))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
recv_from='prep')
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx']))

with flow.build(backend=backend) as f:
f.index(txt_file=self.test_file, batch_size=20)
Expand All @@ -151,11 +151,11 @@ def _test_index_flow(self, backend):

def _test_query_flow(self, backend):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add_router(name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))

with flow.build(backend=backend) as f, open(self.test_file, encoding='utf8') as fp:
f.query(bytes_gen=[v.encode() for v in fp][:3])
Expand All @@ -171,22 +171,22 @@ def test_indexe_query_flow_proc(self):

def test_query_flow_plot(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=2)
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'),
replicas=4)
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=2)
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'),
replicas=4)
.add_router(name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
print(flow.build(backend=None).to_url())

def test_flow_add_set(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add_encoder(yaml_path='PyTorchTransformers', replicas=3)
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))

print(f.to_url())
Expand Down Expand Up @@ -229,5 +229,5 @@ def test_flow_add_set(self):
print(f1.to_swarm_yaml())

def test_common_flow(self):
print(BaseIndexFlow.build(backend=None).to_url())
print(BaseQueryFlow.build(backend=None).to_url())
print(BaseIndexFlow().build(backend=None).to_url())
print(BaseQueryFlow().build(backend=None).to_url())

0 comments on commit ca9ad83

Please sign in to comment.