Skip to content

Commit

Permalink
[agent-NGT] refactor pkg/agent/core/service using FOP (#566)
Browse files Browse the repository at this point in the history
* 🔧 inject ngt service config from usecase layer

Signed-off-by: Rintaro Okamura <[email protected]>

* ♻️ use filepath.Clean

Signed-off-by: Rintaro Okamura <[email protected]>

* ✅ update tests

Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx authored and actions-user committed Jul 20, 2020
1 parent 839c13b commit 7974dcf
Show file tree
Hide file tree
Showing 4 changed files with 673 additions and 141 deletions.
99 changes: 36 additions & 63 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"encoding/gob"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/vdaas/vald/internal/config"
core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
Expand All @@ -38,9 +37,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/metadata"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/core/ngt/model"
"github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs"
)
Expand Down Expand Up @@ -71,27 +68,40 @@ type NGT interface {
}

type ngt struct {
alen int
// instances
core core.NGT
eg errgroup.Group
kvs kvs.BidiMap
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache

// statuses
indexing atomic.Value
saveMu sync.Mutex // creating or saving index
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
idelay time.Duration // initial delay duration
ic uint64 // insert count
nocie uint64 // number of create index execution
eg errgroup.Group
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache
path string
kvs kvs.BidiMap
core core.NGT
dcd bool // disable commit daemon
inMem bool
saveMu sync.Mutex // creating or saving index

// counters
ic uint64 // insert count
nocie uint64 // number of create index execution

// configurations
inMem bool // in-memory mode

alen int // auto indexing length

lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration

minLit time.Duration // minimum load index timeout
maxLit time.Duration // maximum load index timeout
litFactor time.Duration // load index timeout factor

path string // index path

idelay time.Duration // initial delay duration
dcd bool // disable commit daemon

ngtOpts []core.Option
}

type vcache struct {
Expand All @@ -103,32 +113,22 @@ const (
kvsFileName = "ngt-meta.kvsdb"
)

func New(cfg *config.NGT) (nn NGT, err error) {
func New(opts ...Option) (nn NGT, err error) {
n := new(ngt)

n.parseCfg(cfg)

opts := []core.Option{
core.WithInMemoryMode(n.inMem),
core.WithIndexPath(n.path),
core.WithDimension(cfg.Dimension),
core.WithDistanceTypeByString(cfg.DistanceType),
core.WithObjectTypeByString(cfg.ObjectType),
core.WithBulkInsertChunkSize(cfg.BulkInsertChunkSize),
core.WithCreationEdgeSize(cfg.CreationEdgeSize),
core.WithSearchEdgeSize(cfg.SearchEdgeSize),
core.WithDefaultPoolSize(cfg.DefaultPoolSize),
for _, opt := range append(defaultOpts, opts...) {
if err := opt(n); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

n.kvs = kvs.New()

err = n.initNGT(opts...)
err = n.initNGT(n.ngtOpts...)
if err != nil {
return nil, err
}

n.eg = errgroup.Get()

if n.dur == 0 || n.alen == 0 {
n.dcd = true
}
Expand All @@ -146,33 +146,6 @@ func New(cfg *config.NGT) (nn NGT, err error) {
return n, nil
}

func (n *ngt) parseCfg(cfg *config.NGT) {
cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/")

n.inMem = cfg.EnableInMemoryMode

if !n.inMem && cfg.IndexPath != "" {
n.path = cfg.IndexPath
}

n.dur = timeutil.ParseWithDefault(cfg.AutoIndexCheckDuration, 0)
n.lim = timeutil.ParseWithDefault(cfg.AutoIndexDurationLimit, 0)
n.sdur = timeutil.ParseWithDefault(cfg.AutoSaveIndexDuration, 0)

if cfg.InitialDelayMaxDuration != "" {
d := timeutil.ParseWithDefault(cfg.InitialDelayMaxDuration, 0)
n.idelay = time.Duration(
int64(rand.LimitedUint32(uint64(d/time.Second))),
) * time.Second
}

n.alen = cfg.AutoIndexLength

n.minLit = timeutil.ParseWithDefault(cfg.MinLoadIndexTimeout, 3*time.Minute)
n.maxLit = timeutil.ParseWithDefault(cfg.MaxLoadIndexTimeout, 10*time.Minute)
n.litFactor = timeutil.ParseWithDefault(cfg.LoadIndexTimeoutFactor, time.Millisecond)
}

func (n *ngt) initNGT(opts ...core.Option) (err error) {
if _, err = os.Stat(n.path); os.IsNotExist(err) || n.inMem {
n.core, err = core.New(opts...)
Expand Down
Loading

0 comments on commit 7974dcf

Please sign in to comment.