Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[agent-NGT] refactor pkg/agent/core/service using FOP #566

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 36 additions & 63 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"encoding/gob"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/vdaas/vald/internal/config"
core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
Expand All @@ -38,9 +37,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/metadata"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/core/ngt/model"
"github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs"
)
Expand Down Expand Up @@ -71,27 +68,40 @@ type NGT interface {
}

type ngt struct {
alen int
// instances
core core.NGT
eg errgroup.Group
kvs kvs.BidiMap
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache

// statuses
indexing atomic.Value
saveMu sync.Mutex // creating or saving index
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
idelay time.Duration // initial delay duration
ic uint64 // insert count
nocie uint64 // number of create index execution
eg errgroup.Group
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache
path string
kvs kvs.BidiMap
core core.NGT
dcd bool // disable commit daemon
inMem bool
saveMu sync.Mutex // creating or saving index

// counters
ic uint64 // insert count
nocie uint64 // number of create index execution

// configurations
inMem bool // in-memory mode

alen int // auto indexing length

lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration

minLit time.Duration // minimum load index timeout
maxLit time.Duration // maximum load index timeout
litFactor time.Duration // load index timeout factor

path string // index path

idelay time.Duration // initial delay duration
dcd bool // disable commit daemon

ngtOpts []core.Option
}

type vcache struct {
Expand All @@ -103,32 +113,22 @@ const (
kvsFileName = "ngt-meta.kvsdb"
)

func New(cfg *config.NGT) (nn NGT, err error) {
func New(opts ...Option) (nn NGT, err error) {
n := new(ngt)

n.parseCfg(cfg)

opts := []core.Option{
core.WithInMemoryMode(n.inMem),
core.WithIndexPath(n.path),
core.WithDimension(cfg.Dimension),
core.WithDistanceTypeByString(cfg.DistanceType),
core.WithObjectTypeByString(cfg.ObjectType),
core.WithBulkInsertChunkSize(cfg.BulkInsertChunkSize),
core.WithCreationEdgeSize(cfg.CreationEdgeSize),
core.WithSearchEdgeSize(cfg.SearchEdgeSize),
core.WithDefaultPoolSize(cfg.DefaultPoolSize),
for _, opt := range append(defaultOpts, opts...) {
if err := opt(n); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

n.kvs = kvs.New()

err = n.initNGT(opts...)
err = n.initNGT(n.ngtOpts...)
if err != nil {
return nil, err
}

n.eg = errgroup.Get()

if n.dur == 0 || n.alen == 0 {
n.dcd = true
}
Expand All @@ -146,33 +146,6 @@ func New(cfg *config.NGT) (nn NGT, err error) {
return n, nil
}

func (n *ngt) parseCfg(cfg *config.NGT) {
cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/")

n.inMem = cfg.EnableInMemoryMode

if !n.inMem && cfg.IndexPath != "" {
n.path = cfg.IndexPath
}

n.dur = timeutil.ParseWithDefault(cfg.AutoIndexCheckDuration, 0)
n.lim = timeutil.ParseWithDefault(cfg.AutoIndexDurationLimit, 0)
n.sdur = timeutil.ParseWithDefault(cfg.AutoSaveIndexDuration, 0)

if cfg.InitialDelayMaxDuration != "" {
d := timeutil.ParseWithDefault(cfg.InitialDelayMaxDuration, 0)
n.idelay = time.Duration(
int64(rand.LimitedUint32(uint64(d/time.Second))),
) * time.Second
}

n.alen = cfg.AutoIndexLength

n.minLit = timeutil.ParseWithDefault(cfg.MinLoadIndexTimeout, 3*time.Minute)
n.maxLit = timeutil.ParseWithDefault(cfg.MaxLoadIndexTimeout, 10*time.Minute)
n.litFactor = timeutil.ParseWithDefault(cfg.LoadIndexTimeoutFactor, time.Millisecond)
}

func (n *ngt) initNGT(opts ...core.Option) (err error) {
if _, err = os.Stat(n.path); os.IsNotExist(err) || n.inMem {
n.core, err = core.New(opts...)
Expand Down
Loading