Skip to content

Commit

Permalink
🔧 inject ngt service config from usecase layer
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed Jul 9, 2020
1 parent 1db65da commit 8cacdd5
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 64 deletions.
99 changes: 36 additions & 63 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"encoding/gob"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/vdaas/vald/internal/config"
core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
Expand All @@ -38,9 +37,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/metadata"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/core/ngt/model"
"github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs"
)
Expand Down Expand Up @@ -71,27 +68,40 @@ type NGT interface {
}

type ngt struct {
alen int
// instances
core core.NGT
eg errgroup.Group
kvs kvs.BidiMap
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache

// statuses
indexing atomic.Value
saveMu sync.Mutex // creating or saving index
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
idelay time.Duration // initial delay duration
ic uint64 // insert count
nocie uint64 // number of create index execution
eg errgroup.Group
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache
path string
kvs kvs.BidiMap
core core.NGT
dcd bool // disable commit daemon
inMem bool
saveMu sync.Mutex // creating or saving index

// counters
ic uint64 // insert count
nocie uint64 // number of create index execution

// configurations
inMem bool // in-memory mode

alen int // auto indexing length

lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration

minLit time.Duration // minimum load index timeout
maxLit time.Duration // maximum load index timeout
litFactor time.Duration // load index timeout factor

path string // index path

idelay time.Duration // initial delay duration
dcd bool // disable commit daemon

ngtOpts []core.Option
}

type vcache struct {
Expand All @@ -103,32 +113,22 @@ const (
kvsFileName = "ngt-meta.kvsdb"
)

func New(cfg *config.NGT) (nn NGT, err error) {
func New(opts ...Option) (nn NGT, err error) {
n := new(ngt)

n.parseCfg(cfg)

opts := []core.Option{
core.WithInMemoryMode(n.inMem),
core.WithIndexPath(n.path),
core.WithDimension(cfg.Dimension),
core.WithDistanceTypeByString(cfg.DistanceType),
core.WithObjectTypeByString(cfg.ObjectType),
core.WithBulkInsertChunkSize(cfg.BulkInsertChunkSize),
core.WithCreationEdgeSize(cfg.CreationEdgeSize),
core.WithSearchEdgeSize(cfg.SearchEdgeSize),
core.WithDefaultPoolSize(cfg.DefaultPoolSize),
for _, opt := range append(defaultOpts, opts...) {
if err := opt(n); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

n.kvs = kvs.New()

err = n.initNGT(opts...)
err = n.initNGT(n.ngtOpts...)
if err != nil {
return nil, err
}

n.eg = errgroup.Get()

if n.dur == 0 || n.alen == 0 {
n.dcd = true
}
Expand All @@ -146,33 +146,6 @@ func New(cfg *config.NGT) (nn NGT, err error) {
return n, nil
}

func (n *ngt) parseCfg(cfg *config.NGT) {
cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/")

n.inMem = cfg.EnableInMemoryMode

if !n.inMem && cfg.IndexPath != "" {
n.path = cfg.IndexPath
}

n.dur = timeutil.ParseWithDefault(cfg.AutoIndexCheckDuration, 0)
n.lim = timeutil.ParseWithDefault(cfg.AutoIndexDurationLimit, 0)
n.sdur = timeutil.ParseWithDefault(cfg.AutoSaveIndexDuration, 0)

if cfg.InitialDelayMaxDuration != "" {
d := timeutil.ParseWithDefault(cfg.InitialDelayMaxDuration, 0)
n.idelay = time.Duration(
int64(rand.LimitedUint32(uint64(d/time.Second))),
) * time.Second
}

n.alen = cfg.AutoIndexLength

n.minLit = timeutil.ParseWithDefault(cfg.MinLoadIndexTimeout, 3*time.Minute)
n.maxLit = timeutil.ParseWithDefault(cfg.MaxLoadIndexTimeout, 10*time.Minute)
n.litFactor = timeutil.ParseWithDefault(cfg.LoadIndexTimeoutFactor, time.Millisecond)
}

func (n *ngt) initNGT(opts ...core.Option) (err error) {
if _, err = os.Stat(n.path); os.IsNotExist(err) || n.inMem {
n.core, err = core.New(opts...)
Expand Down
209 changes: 209 additions & 0 deletions pkg/agent/core/ngt/service/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package service

import (
"strings"
"time"

core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/timeutil"
)

type Option func(n *ngt) error

var (
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
WithAutoIndexCheckDuration("30m"),
WithAutoIndexDurationLimit("24h"),
WithAutoSaveIndexDuration("35m"),
WithAutoIndexLength(100),
WithInitialDelayMaxDuration("3m"),
WithMinLoadIndexTimeout("3m"),
WithMaxLoadIndexTimeout("10m"),
WithLoadIndexTimeoutFactor("1ms"),
}
)

func WithErrGroup(eg errgroup.Group) Option {
return func(n *ngt) error {
if eg != nil {
n.eg = eg
}

return nil
}
}

func WithEnableInMemoryMode(enabled bool) Option {
return func(n *ngt) error {
n.inMem = enabled

return WithNGTOpts(core.WithInMemoryMode(n.inMem))(n)
}
}

func WithIndexPath(path string) Option {
return func(n *ngt) error {
n.path = strings.TrimSuffix(path, "/")

return WithNGTOpts(core.WithIndexPath(n.path))(n)
}
}

func WithAutoIndexCheckDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.dur = d

return nil
}
}

func WithAutoIndexDurationLimit(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.lim = d

return nil
}
}

func WithAutoSaveIndexDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.sdur = d

return nil
}
}

func WithAutoIndexLength(l int) Option {
return func(n *ngt) error {
n.alen = l

return nil
}
}

func WithInitialDelayMaxDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.idelay = time.Duration(int64(rand.LimitedUint32(uint64(d/time.Second)))) * time.Second

return nil
}
}

func WithMinLoadIndexTimeout(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.minLit = d

return nil
}
}

func WithMaxLoadIndexTimeout(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.maxLit = d

return nil
}
}

func WithLoadIndexTimeoutFactor(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.litFactor = d

return nil
}
}

func WithNGTOpts(opts ...core.Option) Option {
return func(n *ngt) error {
if n.ngtOpts == nil {
n.ngtOpts = opts
return nil
}

n.ngtOpts = append(n.ngtOpts, opts...)

return nil
}
}
Loading

0 comments on commit 8cacdd5

Please sign in to comment.