Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

fix(flow): use recommend flow api to reduce confusion #337

Merged
merged 2 commits into from
Oct 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ class Flow(TrainableBase):
.. highlight:: python
.. code-block:: python

from gnes.flow import Flow, Service as gfs
from gnes.flow import Flow

f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, yaml_path='BasePreprocessor')
.add(gfs.Encoder, yaml_path='BaseEncoder')
.add(gfs.Router, yaml_path='BaseRouter'))
.add_preprocessor(yaml_path='BasePreprocessor')
.add_encoder(yaml_path='BaseEncoder')
.add_router(yaml_path='BaseRouter'))

with f.build(backend='thread') as flow:
flow.index()
Expand All @@ -40,6 +40,9 @@ class Flow(TrainableBase):

"""

# a shortcut to the service frontend, removing one extra import
Frontend = Service.Frontend

def __init__(self, with_frontend: bool = True, is_trained: bool = True, *args, **kwargs):
"""
Create a new Flow object.
Expand Down Expand Up @@ -506,6 +509,10 @@ def add(self, service: Union['Service', str],
Add a service to the current flow object and return the new modified flow object.
The attribute of the service can be later changed with :py:meth:`set` or deleted with :py:meth:`remove`

Note there are shortcut versions of this method.
Recommend to use :py:meth:`add_encoder`, :py:meth:`add_preprocessor`,
:py:meth:`add_router`, :py:meth:`add_indexer` whenever possible.

:param service: a 'Service' enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
:param name: the name identifier of the service, can be used in 'recv_from',
'send_to', :py:meth:`set` and :py:meth:`remove`.
Expand Down
36 changes: 36 additions & 0 deletions gnes/flow/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from . import Flow


class BaseIndexFlow(Flow):
"""
BaseIndexFlow defines a common service pipeline when indexing.

It can not be directly used as all services are using the base module by default.
You have to use :py:meth:`set` to change the `yaml_path` of each service.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
(self.add_preprocessor(name='prep', yaml_path='BasePreprocessor', copy_flow=False)
.add_encoder(name='enc', yaml_path='BaseEncoder', copy_flow=False)
.add_indexer(name='vec_idx', yaml_path='BaseIndexer', copy_flow=False)
.add_indexer(name='doc_idx', yaml_path='BaseIndexer', recv_from='prep', copy_flow=False)
.add_router(name='sync', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'], copy_flow=False))


class BaseQueryFlow(Flow):
"""
BaseIndexFlow defines a common service pipeline when indexing.

It can not be directly used as all services are using the base module by default.
You have to use :py:meth:`set` to change the `yaml_path` of each service.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
(self.add_preprocessor(name='prep', yaml_path='BasePreprocessor', copy_flow=False)
.add_encoder(name='enc', yaml_path='BaseEncoder', copy_flow=False)
.add_indexer(name='vec_idx', yaml_path='BaseIndexer', copy_flow=False)
.add_router(name='scorer', yaml_path='Chunk2DocTopkReducer', copy_flow=False)
.add_indexer(name='doc_idx', yaml_path='BaseIndexer', copy_flow=False))
19 changes: 0 additions & 19 deletions gnes/flow/common.py

This file was deleted.

120 changes: 60 additions & 60 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import unittest

from gnes.cli.parser import set_client_cli_parser
from gnes.flow import Flow, Service as gfs, FlowBuildLevelMismatch
from gnes.flow.common import BaseIndexFlow, BaseQueryFlow
from gnes.flow import Flow, FlowBuildLevelMismatch
from gnes.flow.base import BaseIndexFlow, BaseQueryFlow


class TestGNESFlow(unittest.TestCase):
Expand Down Expand Up @@ -38,15 +38,15 @@ def tearDown(self):

def test_flow1(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter'))
g = f.add(gfs.Router, yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter'))
g = f.add_router(yaml_path='BaseRouter')

print('f: %r g: %r' % (f, g))
g.build()
print(g.to_mermaid())

f = f.add(gfs.Router, yaml_path='BaseRouter')
g = g.add(gfs.Router, yaml_path='BaseRouter')
f = f.add_router(yaml_path='BaseRouter')
g = g.add_router(yaml_path='BaseRouter')

print('f: %r g: %r' % (f, g))
f.build()
Expand All @@ -55,76 +55,76 @@ def test_flow1(self):

def test_flow1_ctx_empty(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter'))
.add_router(yaml_path='BaseRouter'))
with f(backend='process'):
pass

def test_flow1_ctx(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Router, yaml_path='BaseRouter'))
.add_router(yaml_path='BaseRouter'))
with flow(backend='process', copy_flow=True) as f, open(self.test_file) as fp:
f.index(txt_file=self.test_file, batch_size=4)
f.train(txt_file=self.test_file, batch_size=4)

with flow(backend='process', copy_flow=True) as f:
# change the flow inside build shall fail
f = f.add(gfs.Router, yaml_path='BaseRouter')
f = f.add_router(yaml_path='BaseRouter')
self.assertRaises(FlowBuildLevelMismatch, f.index, txt_file=self.test_file, batch_size=4)

print(flow.build(backend=None).to_mermaid())

def test_flow2(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add(gfs.Router, yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.add_router(yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow3(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, name='r0', send_to=gfs.Frontend, yaml_path='BaseRouter')
.add(gfs.Router, name='r1', recv_from=gfs.Frontend, yaml_path='BaseRouter')
.add_router(name='r0', send_to=Flow.Frontend, yaml_path='BaseRouter')
.add_router(name='r1', recv_from=Flow.Frontend, yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow4(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Router, name='r0', yaml_path='BaseRouter')
.add(gfs.Router, name='r1', recv_from=gfs.Frontend, yaml_path='BaseRouter')
.add(gfs.Router, name='reduce', recv_from=['r0', 'r1'], yaml_path='BaseRouter')
.add_router(name='r0', yaml_path='BaseRouter')
.add_router(name='r1', recv_from=Flow.Frontend, yaml_path='BaseRouter')
.add_router(name='reduce', recv_from=['r0', 'r1'], yaml_path='BaseRouter')
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())

def test_flow5(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path='PyTorchTransformers')
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer')
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep')
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path='PyTorchTransformers')
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer')
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep')
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))
print(f._service_edges)
print(f.to_mermaid())
# f.to_jpg()

def test_flow_replica_pot(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add_encoder(yaml_path='PyTorchTransformers', replicas=3)
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))
print(f.to_mermaid())
print(f.to_url(left_right=False))
Expand All @@ -135,13 +135,13 @@ def _test_index_flow(self, backend):
self.assertFalse(os.path.exists(k))

flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
recv_from='prep')
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx']))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'),
recv_from='prep')
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx']))

with flow.build(backend=backend) as f:
f.index(txt_file=self.test_file, batch_size=20)
Expand All @@ -151,11 +151,11 @@ def _test_index_flow(self, backend):

def _test_query_flow(self, backend):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor')
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add_router(name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))

with flow.build(backend=backend) as f, open(self.test_file, encoding='utf8') as fp:
f.query(bytes_gen=[v.encode() for v in fp][:3])
Expand All @@ -171,22 +171,22 @@ def test_indexe_query_flow_proc(self):

def test_query_flow_plot(self):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=2)
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'),
replicas=4)
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=2)
.add_encoder(yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'), replicas=3)
.add_indexer(name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'),
replicas=4)
.add_router(name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add_indexer(name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))
print(flow.build(backend=None).to_url())

def test_flow_add_set(self):
f = (Flow(check_version=False, route_table=True)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add(gfs.Encoder, yaml_path='PyTorchTransformers', replicas=3)
.add(gfs.Indexer, name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add(gfs.Indexer, name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.add_preprocessor(name='prep', yaml_path='SentSplitPreprocessor', replicas=4)
.add_encoder(yaml_path='PyTorchTransformers', replicas=3)
.add_indexer(name='vec_idx', yaml_path='NumpyIndexer', replicas=2)
.add_indexer(name='doc_idx', yaml_path='DictIndexer', recv_from='prep', replicas=2)
.add_router(name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, recv_from=['vec_idx', 'doc_idx'])
.build(backend=None))

print(f.to_url())
Expand Down Expand Up @@ -229,5 +229,5 @@ def test_flow_add_set(self):
print(f1.to_swarm_yaml())

def test_common_flow(self):
print(BaseIndexFlow.build(backend=None).to_url())
print(BaseQueryFlow.build(backend=None).to_url())
print(BaseIndexFlow().build(backend=None).to_url())
print(BaseQueryFlow().build(backend=None).to_url())