Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[agent-NGT] refactor pkg/agent/core/service using FOP #566

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 36 additions & 63 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"encoding/gob"
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/vdaas/vald/internal/config"
core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/encoding/json"
"github.com/vdaas/vald/internal/errgroup"
Expand All @@ -38,9 +37,7 @@ import (
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/metadata"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/timeutil"
"github.com/vdaas/vald/pkg/agent/core/ngt/model"
"github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs"
)
Expand Down Expand Up @@ -71,27 +68,40 @@ type NGT interface {
}

type ngt struct {
alen int
// instances
core core.NGT
eg errgroup.Group
kvs kvs.BidiMap
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache

// statuses
indexing atomic.Value
saveMu sync.Mutex // creating or saving index
lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration
idelay time.Duration // initial delay duration
ic uint64 // insert count
nocie uint64 // number of create index execution
eg errgroup.Group
ivc *vcaches // insertion vector cache
dvc *vcaches // deletion vector cache
path string
kvs kvs.BidiMap
core core.NGT
dcd bool // disable commit daemon
inMem bool
saveMu sync.Mutex // creating or saving index

// counters
ic uint64 // insert count
nocie uint64 // number of create index execution

// configurations
inMem bool // in-memory mode

alen int // auto indexing length

lim time.Duration // auto indexing time limit
dur time.Duration // auto indexing check duration
sdur time.Duration // auto save index check duration

minLit time.Duration // minimum load index timeout
maxLit time.Duration // maximum load index timeout
litFactor time.Duration // load index timeout factor

path string // index path

idelay time.Duration // initial delay duration
dcd bool // disable commit daemon

ngtOpts []core.Option
}

type vcache struct {
Expand All @@ -103,32 +113,22 @@ const (
kvsFileName = "ngt-meta.kvsdb"
)

func New(cfg *config.NGT) (nn NGT, err error) {
func New(opts ...Option) (nn NGT, err error) {
n := new(ngt)

n.parseCfg(cfg)

opts := []core.Option{
core.WithInMemoryMode(n.inMem),
core.WithIndexPath(n.path),
core.WithDimension(cfg.Dimension),
core.WithDistanceTypeByString(cfg.DistanceType),
core.WithObjectTypeByString(cfg.ObjectType),
core.WithBulkInsertChunkSize(cfg.BulkInsertChunkSize),
core.WithCreationEdgeSize(cfg.CreationEdgeSize),
core.WithSearchEdgeSize(cfg.SearchEdgeSize),
core.WithDefaultPoolSize(cfg.DefaultPoolSize),
for _, opt := range append(defaultOpts, opts...) {
if err := opt(n); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

n.kvs = kvs.New()

err = n.initNGT(opts...)
err = n.initNGT(n.ngtOpts...)
if err != nil {
return nil, err
}

n.eg = errgroup.Get()

if n.dur == 0 || n.alen == 0 {
n.dcd = true
}
Expand All @@ -146,33 +146,6 @@ func New(cfg *config.NGT) (nn NGT, err error) {
return n, nil
}

func (n *ngt) parseCfg(cfg *config.NGT) {
cfg.IndexPath = strings.TrimSuffix(cfg.IndexPath, "/")

n.inMem = cfg.EnableInMemoryMode

if !n.inMem && cfg.IndexPath != "" {
n.path = cfg.IndexPath
}

n.dur = timeutil.ParseWithDefault(cfg.AutoIndexCheckDuration, 0)
n.lim = timeutil.ParseWithDefault(cfg.AutoIndexDurationLimit, 0)
n.sdur = timeutil.ParseWithDefault(cfg.AutoSaveIndexDuration, 0)

if cfg.InitialDelayMaxDuration != "" {
d := timeutil.ParseWithDefault(cfg.InitialDelayMaxDuration, 0)
n.idelay = time.Duration(
int64(rand.LimitedUint32(uint64(d/time.Second))),
) * time.Second
}

n.alen = cfg.AutoIndexLength

n.minLit = timeutil.ParseWithDefault(cfg.MinLoadIndexTimeout, 3*time.Minute)
n.maxLit = timeutil.ParseWithDefault(cfg.MaxLoadIndexTimeout, 10*time.Minute)
n.litFactor = timeutil.ParseWithDefault(cfg.LoadIndexTimeoutFactor, time.Millisecond)
}

func (n *ngt) initNGT(opts ...core.Option) (err error) {
if _, err = os.Stat(n.path); os.IsNotExist(err) || n.inMem {
n.core, err = core.New(opts...)
Expand Down
209 changes: 209 additions & 0 deletions pkg/agent/core/ngt/service/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package service

import (
"strings"
"time"

core "github.com/vdaas/vald/internal/core/ngt"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/rand"
"github.com/vdaas/vald/internal/timeutil"
)

type Option func(n *ngt) error

var (
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),
WithAutoIndexCheckDuration("30m"),
WithAutoIndexDurationLimit("24h"),
WithAutoSaveIndexDuration("35m"),
WithAutoIndexLength(100),
WithInitialDelayMaxDuration("3m"),
WithMinLoadIndexTimeout("3m"),
WithMaxLoadIndexTimeout("10m"),
WithLoadIndexTimeoutFactor("1ms"),
}
)

func WithErrGroup(eg errgroup.Group) Option {
return func(n *ngt) error {
if eg != nil {
n.eg = eg
}

return nil
}
}

func WithEnableInMemoryMode(enabled bool) Option {
return func(n *ngt) error {
n.inMem = enabled

return WithNGTOpts(core.WithInMemoryMode(n.inMem))(n)
}
}

func WithIndexPath(path string) Option {
return func(n *ngt) error {
n.path = strings.TrimSuffix(path, "/")
rinx marked this conversation as resolved.
Show resolved Hide resolved

return WithNGTOpts(core.WithIndexPath(n.path))(n)
}
}

func WithAutoIndexCheckDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.dur = d

return nil
}
}

func WithAutoIndexDurationLimit(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.lim = d

return nil
}
}

func WithAutoSaveIndexDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.sdur = d

return nil
}
}

func WithAutoIndexLength(l int) Option {
return func(n *ngt) error {
n.alen = l

return nil
}
}

func WithInitialDelayMaxDuration(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.idelay = time.Duration(int64(rand.LimitedUint32(uint64(d/time.Second)))) * time.Second

return nil
}
}

func WithMinLoadIndexTimeout(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.minLit = d

return nil
}
}

func WithMaxLoadIndexTimeout(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.maxLit = d

return nil
}
}

func WithLoadIndexTimeoutFactor(dur string) Option {
return func(n *ngt) error {
if dur == "" {
return nil
}

d, err := timeutil.Parse(dur)
if err != nil {
return err
}

n.litFactor = d

return nil
}
}

func WithNGTOpts(opts ...core.Option) Option {
return func(n *ngt) error {
if n.ngtOpts == nil {
n.ngtOpts = opts
return nil
}

n.ngtOpts = append(n.ngtOpts, opts...)

return nil
}
}
Loading