Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[patch] add agent auto save indexing feature #385

Merged
merged 6 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,8 @@ agent:
auto_index_check_duration: 30m
# agent.ngt.auto_index_length -- number of cache to trigger automatic indexing
auto_index_length: 100
# agent.ngt.auto_save_index_duration -- duration of automatic save index
auto_save_index_duration: 35m
# agent.ngt.dimension -- vector dimension
dimension: 4096
# agent.ngt.bulk_insert_chunk_size -- bulk insert chunk size
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ replace (
github.com/coreos/etcd => go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547
github.com/docker/docker => github.com/moby/moby v17.12.0-ce-rc1.0.20200309214505-aa6a9891b09c+incompatible
github.com/envoyproxy/protoc-gen-validate => github.com/envoyproxy/protoc-gen-validate v0.3.0-java
github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.5.1-0.20200513234351-f378f59f6710
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200511135441-57b003a04490
github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.5.1-0.20200517154853-096feaaf8e9f
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20200515162754-0714040f3e35
github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.1
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.10.0
github.com/gorilla/mux => github.com/gorilla/mux v1.7.4
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.11.0
github.com/gorilla/mux => github.com/gorilla/mux v1.7.5-0.20200517040254-948bec34b516
github.com/gorilla/websocket => github.com/gorilla/websocket v1.4.2
github.com/tensorflow/tensorflow => github.com/tensorflow/tensorflow v2.1.0+incompatible
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
Expand Down Expand Up @@ -59,7 +59,7 @@ require (
gonum.org/v1/hdf5 v0.0.0-20200504100616-496fefe91614
gonum.org/v1/netlib v0.0.0-20200317120129-c5a04cffd98a // indirect
gonum.org/v1/plot v0.7.0
google.golang.org/genproto v0.0.0-20200514193133-8feb7f20f2a2
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587
google.golang.org/grpc v1.29.1
gopkg.in/yaml.v2 v2.3.0
k8s.io/api v0.18.2
Expand Down
20 changes: 9 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDA
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs=
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-sql-driver/mysql v1.5.1-0.20200513234351-f378f59f6710 h1:AHajtDLQUi2wmBQbhsvy7imZ9CRaRaSTkPTruDOGUW0=
github.com/go-sql-driver/mysql v1.5.1-0.20200513234351-f378f59f6710/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.5.1-0.20200517154853-096feaaf8e9f h1:lGOphibtewuHJ9+UQFUs1rRyEcbIyD3jbmnDk+pRbjo=
github.com/go-sql-driver/mysql v1.5.1-0.20200517154853-096feaaf8e9f/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gocql/gocql v0.0.0-20200511135441-57b003a04490 h1:DE7b84SsUn+1Y3LG4YpFn7KpUra0iuYwS6PkSayorcg=
github.com/gocql/gocql v0.0.0-20200511135441-57b003a04490/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20200515162754-0714040f3e35 h1:5kRvQfb7q17Og+HNo8NXHjcbkyEqJp75ZzXHfsa2JvE=
github.com/gocql/gocql v0.0.0-20200515162754-0714040f3e35/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocraft/dbr/v2 v2.7.0 h1:x+UnhSBYPFBBdtikLSMLQ9KPuquSUj4yBijsQAhhNZo=
github.com/gocraft/dbr/v2 v2.7.0/go.mod h1:wQdbxPBSloo2OlSedMxfNW0mgk0GXys9O1VFmQiwcx4=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -243,10 +243,10 @@ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsC
github.com/googleapis/gnostic v0.1.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/googleapis/gnostic v0.3.1 h1:WeAefnSUHlBb0iJKwxFDZdbfGwkd7xRNuV+IpXMJhYk=
github.com/googleapis/gnostic v0.3.1/go.mod h1:on+2t9HRStVgn95RSsFWFz+6Q0Snyqv1awfrALZdbtU=
github.com/gophercloud/gophercloud v0.10.0/go.mod h1:gmC5oQqMDOMO1t1gq5DquX/yAU808e/4mzjjDA76+Ss=
github.com/gophercloud/gophercloud v0.11.0/go.mod h1:gmC5oQqMDOMO1t1gq5DquX/yAU808e/4mzjjDA76+Ss=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.7.5-0.20200517040254-948bec34b516 h1:BU5W11a5nPsU82VaJwaimHGO/Z50RGtaraAXe4srksU=
github.com/gorilla/mux v1.7.5-0.20200517040254-948bec34b516/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
Expand Down Expand Up @@ -641,8 +641,6 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d h1:n6zwymXmN9rCClNNmCWwV3qwMmBcRw/WeIGDK8Qnzk4=
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53 h1:vmsb6v0zUdmUlXfwKaYrHPPRCV0lHq/IwNIf0ASGjyQ=
golang.org/x/tools v0.0.0-20200515220128-d3bf790afa53/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -676,8 +674,8 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200514193133-8feb7f20f2a2 h1:RwW6+LxyOQJ7oeoZ76GIJlwt/O0J5cN2fk+q/jK27kQ=
google.golang.org/genproto v0.0.0-20200514193133-8feb7f20f2a2/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587 h1:1Ym+vvUpq1ZHvxzn34gENJX8U4aKO+vhy2P/2+Xl6qQ=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down
4 changes: 2 additions & 2 deletions hack/benchmark/e2e/agent/ngt/ngt_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/vdaas/vald/hack/benchmark/internal/e2e"
"github.com/vdaas/vald/hack/benchmark/internal/e2e/strategy"
"github.com/vdaas/vald/hack/benchmark/internal/starter/agent/ngt"
"github.com/vdaas/vald/internal/client/agent/ngt/grpc"
"github.com/vdaas/vald/internal/client/agent/ngt/rest"
"github.com/vdaas/vald/internal/client/agent/grpc"
"github.com/vdaas/vald/internal/client/agent/rest"
"github.com/vdaas/vald/internal/log"
)

Expand Down
2 changes: 1 addition & 1 deletion hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ replace (
github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql master
github.com/gocql/gocql => github.com/gocql/gocql master
github.com/gogo/protobuf => github.com/gogo/protobuf master
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.10.0
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.11.0
github.com/gorilla/mux => github.com/gorilla/mux master
github.com/gorilla/websocket => github.com/gorilla/websocket master
github.com/tensorflow/tensorflow => github.com/tensorflow/tensorflow v2.1.0
Expand Down
5 changes: 5 additions & 0 deletions internal/config/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type NGT struct {
// AutoIndexCheckDuration represent checking loop duration about auto indexing execution
AutoIndexCheckDuration string `yaml:"auto_index_check_duration" json:"auto_index_check_duration"`

// AutoSaveIndexDuration represent checking loop duration about auto save index execution
AutoSaveIndexDuration string `yaml:"auto_save_index_duration" json:"auto_save_index_duration"`

// AutoIndexLength represent auto index length limit
AutoIndexLength int `yaml:"auto_index_length" json:"auto_index_length"`

Expand All @@ -58,5 +61,7 @@ func (n *NGT) Bind() *NGT {
n.DistanceType = GetActualValue(n.DistanceType)
n.ObjectType = GetActualValue(n.ObjectType)
n.AutoIndexCheckDuration = GetActualValue(n.AutoIndexCheckDuration)
n.AutoIndexDurationLimit = GetActualValue(n.AutoIndexDurationLimit)
n.AutoSaveIndexDuration = GetActualValue(n.AutoSaveIndexDuration)
return n
}
3 changes: 3 additions & 0 deletions internal/core/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func gen(isLoad bool, opts ...Option) (NGT, error) {

if isLoad {
err = n.open()
if err != nil {
err = n.create()
}
} else {
err = n.create()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/ngt/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func WithDistanceTypeByString(dt string) Option {
d = Angle
case "hamming":
d = Hamming
case "cosine":
case "cosine", "cos":
d = Cosine
case "normalizedangle":
d = NormalizedAngle
Expand Down
101 changes: 64 additions & 37 deletions pkg/agent/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type ngt struct {
indexing atomic.Value
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
dps uint32 // default pool size
ic uint64 // insert count
nocie uint64 // number of create index execution
Expand All @@ -79,6 +80,7 @@ type ngt struct {
kvs kvs.BidiMap
core core.NGT
dcd bool // disable commit daemon
inMem bool
}

type vcache struct {
Expand All @@ -92,9 +94,10 @@ const (

func New(cfg *config.NGT) (nn NGT, err error) {
n := new(ngt)
n.inMem = cfg.EnableInMemoryMode
cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/")
opts := []core.Option{
core.WithInMemoryMode(cfg.EnableInMemoryMode),
core.WithInMemoryMode(n.inMem),
core.WithIndexPath(cfg.IndexPath),
core.WithDimension(cfg.Dimension),
core.WithDistanceTypeByString(cfg.DistanceType),
Expand All @@ -104,13 +107,13 @@ func New(cfg *config.NGT) (nn NGT, err error) {
core.WithSearchEdgeSize(cfg.SearchEdgeSize),
}

if !cfg.EnableInMemoryMode && len(cfg.IndexPath) != 0 {
if !n.inMem && len(cfg.IndexPath) != 0 {
n.path = cfg.IndexPath
}

n.kvs = kvs.New()

if _, err = os.Stat(cfg.IndexPath); os.IsNotExist(err) {
if _, err = os.Stat(cfg.IndexPath); os.IsNotExist(err) || n.inMem {
n.core, err = core.New(opts...)
} else {
eg, _ := errgroup.New(context.Background())
Expand All @@ -119,7 +122,7 @@ func New(cfg *config.NGT) (nn NGT, err error) {
return err
}))
eg.Go(safety.RecoverFunc(func() (err error) {
if len(n.path) != 0 {
if len(n.path) != 0 && !n.inMem {
kpango marked this conversation as resolved.
Show resolved Hide resolved
m := make(map[string]uint32)
gob.Register(map[string]uint32{})
f := file.Open(n.path+"/"+kvsFileName, os.O_RDONLY|os.O_SYNC, os.ModePerm)
Expand Down Expand Up @@ -156,6 +159,14 @@ func New(cfg *config.NGT) (nn NGT, err error) {
n.lim = d
}

if cfg.AutoSaveIndexDuration != "" {
d, err := timeutil.Parse(cfg.AutoSaveIndexDuration)
if err != nil {
d = 0
}
n.sdur = d
}

n.alen = cfg.AutoIndexLength

n.eg = errgroup.Get()
Expand All @@ -182,30 +193,43 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
return nil
}
ech := make(chan error, 2)
n.eg.Go(safety.RecoverFunc(func() error {
n.eg.Go(safety.RecoverFunc(func() (err error) {
if n.sdur == 0 {
n.sdur = n.dur + time.Second
}
if n.lim == 0 {
n.lim = n.dur * 2
}
defer close(ech)
tick := time.NewTicker(n.dur)
sTick := time.NewTicker(n.sdur)
limit := time.NewTicker(n.lim)
defer tick.Stop()
defer sTick.Stop()
defer limit.Stop()
for {
err = nil
select {
case <-ctx.Done():
err = n.CreateAndSaveIndex(ctx, n.dps)
if err != nil {
ech <- err
return errors.Wrap(ctx.Err(), err.Error())
}
return ctx.Err()
case <-tick.C:
if int(atomic.LoadUint64(&n.ic)) >= n.alen {
err := n.CreateIndex(n.dps)
if err != nil && err != errors.ErrUncommittedIndexNotFound {
ech <- err
runtime.Gosched()
}
err = n.CreateIndex(n.dps)
}
case <-limit.C:
err := n.CreateIndex(n.dps)
if err != nil && err != errors.ErrUncommittedIndexNotFound {
ech <- err
runtime.Gosched()
}
err = n.CreateAndSaveIndex(ctx, n.dps)
case <-sTick.C:
err = n.SaveIndex(ctx)
}
if err != nil && err != errors.ErrUncommittedIndexNotFound {
ech <- err
runtime.Gosched()
err = nil
}
}
}))
Expand Down Expand Up @@ -497,28 +521,31 @@ func (n *ngt) CreateIndex(poolSize uint32) (err error) {
}

func (n *ngt) SaveIndex(ctx context.Context) (err error) {
eg, ctx := errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
if len(n.path) != 0 {
m := make(map[string]uint32, n.kvs.Len())
var mu sync.Mutex
n.kvs.Range(ctx, func(key string, id uint32) bool {
mu.Lock()
m[key] = id
mu.Unlock()
return true
})
f := file.Open(n.path+"/"+kvsFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
defer f.Close()
gob.Register(map[string]uint32{})
return gob.NewEncoder(f).Encode(&m)
}
return nil
}))
eg.Go(safety.RecoverFunc(func() error {
return n.core.SaveIndex()
}))
return eg.Wait()
if len(n.path) != 0 && !n.inMem {
eg, ctx := errgroup.New(ctx)
eg.Go(safety.RecoverFunc(func() error {
if len(n.path) != 0 {
m := make(map[string]uint32, n.kvs.Len())
var mu sync.Mutex
n.kvs.Range(ctx, func(key string, id uint32) bool {
mu.Lock()
m[key] = id
mu.Unlock()
return true
})
f := file.Open(n.path+"/"+kvsFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
defer f.Close()
gob.Register(map[string]uint32{})
return gob.NewEncoder(f).Encode(&m)
}
return nil
}))
eg.Go(safety.RecoverFunc(func() error {
return n.core.SaveIndex()
}))
err = eg.Wait()
}
return
}

func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/manager/index/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@ func (idx *index) execute(ctx context.Context, enableLowIndexSkip bool) (err err
return nil
}
}
_, err := agent.NewAgentClient(conn).CreateIndex(ctx, &payload.Control_CreateIndexRequest{
ac := agent.NewAgentClient(conn)
_, err = ac.CreateIndex(ctx, &payload.Control_CreateIndexRequest{
PoolSize: idx.creationPoolSize,
}, copts...)
if err != nil {
log.Debug(addr, err)
return err
}
_, err = ac.SaveIndex(ctx, &payload.Empty{}, copts...)
if err != nil {
log.Debug(addr, err)
return err
}
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion versions/NGT_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.11.4
1.11.5