diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index f86263b5d2..3e5c434ae6 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -10,5 +10,6 @@ "moby": "true" } }, - "postCreateCommand": "go version" + "postCreateCommand": "go version", + "postAttachCommand": "sudo ln -s $(pwd)/cmd/agent/core/ngt/sample.yaml /etc/server/config.yaml" } diff --git a/.gitignore b/.gitignore index 78d95d3c11..fd593b1c6c 100755 --- a/.gitignore +++ b/.gitignore @@ -51,4 +51,7 @@ hack/go.mod.default3 # for nvim .nvimlog +# for vscode +.vscode/ + telepresence.log diff --git a/Makefile.d/proto.mk b/Makefile.d/proto.mk index 200804406e..f0871b0c7c 100644 --- a/Makefile.d/proto.mk +++ b/Makefile.d/proto.mk @@ -58,8 +58,7 @@ proto/deps: \ $(GOPATH)/src/github.com/planetscale/vtprotobuf \ $(GOPATH)/src/github.com/protocolbuffers/protobuf \ $(GOPATH)/src/google.golang.org/genproto \ - $(GOPATH)/src/google.golang.org/protobuf \ - $(ROOTDIR)/apis/proto/v1 + $(GOPATH)/src/google.golang.org/protobuf $(GOPATH)/src/github.com/protocolbuffers/protobuf: git clone \ diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 4fd6742e1e..353cce0d24 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -1715,6 +1715,9 @@ agent: # @schema {"name": "agent.ngt.kvsdb.concurrency", "type": "integer"} # agent.ngt.kvsdb.concurrency -- kvsdb processing concurrency concurrency: 6 + # @schema {"name": "agent.ngt.broken_index_history_limit", "type": "integer"} + # agent.ngt.broken_index_history_limit -- maximum number of broken index generations to backup + broken_index_history_limit: 0 # @schema {"name": "agent.sidecar", "type": "object"} sidecar: # @schema {"name": "agent.sidecar.enabled", "type": "boolean"} diff --git a/cmd/agent/core/ngt/sample.yaml b/cmd/agent/core/ngt/sample.yaml index 9a18e303f0..403d786b7a 100644 --- a/cmd/agent/core/ngt/sample.yaml +++ b/cmd/agent/core/ngt/sample.yaml @@ -126,3 +126,4 @@ ngt: object_type: float search_edge_size: 10 enable_copy_on_write: false + broken_index_history_limit: 0 diff --git a/dockers/dev/Dockerfile b/dockers/dev/Dockerfile index 896d54dbd1..790183a51d 100644 --- a/dockers/dev/Dockerfile +++ b/dockers/dev/Dockerfile @@ -60,6 +60,11 @@ RUN make ngt/install \ && make kubectl/install \ && make k3d/install +# change GOPATH owner to vscode to run commands as user vscode without root permission +RUN chown -R vscode:vscode "${GOPATH}" \ + # mkdir for agent configuration install + && mkdir -p /etc/server + # k9s installs the binary to the current user which is root for devcontainer # so change the current user to vscode to user afterwards USER vscode diff --git a/internal/config/ngt.go b/internal/config/ngt.go index bc8e6c1b4d..1ad56ed786 100644 --- a/internal/config/ngt.go +++ b/internal/config/ngt.go @@ -88,6 +88,9 @@ type NGT struct { // KVSDB represent the ngt bidirectional kv store configuration KVSDB *KVSDB `json:"kvsdb,omitempty" yaml:"kvsdb"` + + // BrokenIndexHistoryLimit represents the maximum number of broken index generations that will be backed up + BrokenIndexHistoryLimit int `yaml:"broken_index_history_limit" json:"broken_index_history_limit,omitempty"` } // KVSDB represent the ngt vector bidirectional kv store configuration diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index e94ae09fb7..b8f3a7e7bf 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -25,7 +25,6 @@ package ngt import "C" import ( - "os" "reflect" "sync" "unsafe" @@ -307,18 +306,6 @@ func (n *ngt) loadOptions(opts ...Option) (err error) { } func (n *ngt) create() (err error) { - files, err := file.ListInDir(n.idxPath) - if err == nil && len(files) != 0 { - log.Warnf("index path exists, will remove the directories: %v", files) - for _, f := range files { - err = os.RemoveAll(f) - if err != nil { - return err - } - } - } else if err != nil { - log.Debug(err) - } path := C.CString(n.idxPath) defer C.free(unsafe.Pointer(path)) diff --git a/internal/errors/errors.go b/internal/errors/errors.go index b9585230c9..ecf39000fa 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -80,6 +80,11 @@ var ( return Wrapf(err, "failed to output %s logs", str) } + // ErrIndexPathNotExists represents a function to generate an error that the index path is not exists. + ErrIndexPathNotExists = func(path string) error { + return Errorf("index path %s not exists", path) + } + // New represents a function to generate the new error with a message. // When the message is nil, it will return nil instead of an error. New = func(msg string) error { diff --git a/internal/test/data/agent/ngt/validIndex/README.md b/internal/test/data/agent/ngt/validIndex/README.md new file mode 100644 index 0000000000..abbb0d9b65 --- /dev/null +++ b/internal/test/data/agent/ngt/validIndex/README.md @@ -0,0 +1 @@ +This directory holds an outcome of inserting `datafashion-mnist-784-euclidean` to Vald agent by using `example/client/agent/main.go` with `insertCount = 100`. diff --git a/internal/test/data/agent/ngt/validIndex/grp b/internal/test/data/agent/ngt/validIndex/grp new file mode 100644 index 0000000000..7fd2428b68 Binary files /dev/null and b/internal/test/data/agent/ngt/validIndex/grp differ diff --git a/internal/test/data/agent/ngt/validIndex/metadata.json b/internal/test/data/agent/ngt/validIndex/metadata.json new file mode 100644 index 0000000000..bb2b1e9897 --- /dev/null +++ b/internal/test/data/agent/ngt/validIndex/metadata.json @@ -0,0 +1 @@ +{ "is_invalid": false, "ngt": { "index_count": 100 } } diff --git a/internal/test/data/agent/ngt/validIndex/ngt-meta.kvsdb b/internal/test/data/agent/ngt/validIndex/ngt-meta.kvsdb new file mode 100644 index 0000000000..d626b0bcd2 Binary files /dev/null and b/internal/test/data/agent/ngt/validIndex/ngt-meta.kvsdb differ diff --git a/internal/test/data/agent/ngt/validIndex/ngt-timestamp.kvsdb b/internal/test/data/agent/ngt/validIndex/ngt-timestamp.kvsdb new file mode 100644 index 0000000000..4690f59811 Binary files /dev/null and b/internal/test/data/agent/ngt/validIndex/ngt-timestamp.kvsdb differ diff --git a/internal/test/data/agent/ngt/validIndex/obj b/internal/test/data/agent/ngt/validIndex/obj new file mode 100644 index 0000000000..c2af6b5395 Binary files /dev/null and b/internal/test/data/agent/ngt/validIndex/obj differ diff --git a/internal/test/data/agent/ngt/validIndex/prf b/internal/test/data/agent/ngt/validIndex/prf new file mode 100644 index 0000000000..c271714025 --- /dev/null +++ b/internal/test/data/agent/ngt/validIndex/prf @@ -0,0 +1,26 @@ +AccuracyTable +BatchSizeForCreation 200 +BuildTimeLimit 0 +DatabaseType Memory +Dimension 784 +DistanceType L2 +DynamicEdgeSizeBase 30 +DynamicEdgeSizeRate 20 +EdgeSizeForCreation 20 +EdgeSizeForSearch 10 +EdgeSizeLimitForCreation 5 +EpsilonForCreation 0.1 +GraphType ANNG +IncomingEdge 80 +IncrimentalEdgeSizeLimitForTruncation 0 +IndexType GraphAndTree +ObjectAlignment False +ObjectType Float-4 +OutgoingEdge 10 +PathAdjustmentInterval 0 +PrefetchOffset 1 +PrefetchSize 3136 +SeedSize 10 +SeedType None +ThreadPoolSize 32 +TruncationThreadPoolSize 8 diff --git a/internal/test/data/agent/ngt/validIndex/tre b/internal/test/data/agent/ngt/validIndex/tre new file mode 100644 index 0000000000..8e27a21768 Binary files /dev/null and b/internal/test/data/agent/ngt/validIndex/tre differ diff --git a/internal/test/testdata.go b/internal/test/testdata.go index 3085b30b52..a242c419e5 100644 --- a/internal/test/testdata.go +++ b/internal/test/testdata.go @@ -21,6 +21,10 @@ import ( "github.com/vdaas/vald/internal/strings" ) +const ( + ValidIndex = "agent/ngt/validIndex" +) + // GetTestdataPath returns the test data file path under `internal/test/data`. func GetTestdataPath(filename string) string { return file.Join(baseDir(), "/internal/test/data/", filename) diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 56d2f6a46a..becfc22e02 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -20,6 +20,7 @@ package service import ( "context" "encoding/gob" + "fmt" "io/fs" "math" "os" @@ -39,6 +40,7 @@ import ( "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/slices" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/pkg/agent/core/ngt/model" "github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs" @@ -117,13 +119,14 @@ type ngt struct { enableProactiveGC bool // if this value is true, agent component will purge GC memory more proactive enableCopyOnWrite bool // if this value is true, agent component will write backup file using Copy on Write and saves old files to the old directory - path string // index path - smu sync.Mutex // save index lock - tmpPath atomic.Value // temporary index path for Copy on Write - oldPath string // old volume path - basePath string // index base directory for CoW - cowmu sync.Mutex // copy on write move lock - backupGen uint64 // number of backup generation + path string // index path + smu sync.Mutex // save index lock + tmpPath atomic.Value // temporary index path for Copy on Write + oldPath string // old volume path + basePath string // index base directory for CoW + brokenPath string // backup broken index path + cowmu sync.Mutex // copy on write move lock + backupGen uint64 // number of backup generation poolSize uint32 // default pool size radius float32 // default radius @@ -133,6 +136,7 @@ type ngt struct { dcd bool // disable commit daemon kvsdbConcurrency int // kvsdb concurrency + historyLimit int // the maximum generation number of broken index backup } const ( @@ -142,6 +146,7 @@ const ( oldIndexDirName = "backup" originIndexDirName = "origin" + brokenIndexDirName = "broken" ) func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { @@ -151,6 +156,7 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { enableProactiveGC: cfg.EnableProactiveGC, enableCopyOnWrite: cfg.EnableCopyOnWrite, kvsdbConcurrency: cfg.KVSDB.Concurrency, + historyLimit: cfg.BrokenIndexHistoryLimit, } for _, opt := range append(defaultOptions, opts...) { @@ -163,9 +169,16 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { n.inMem = true } - err = n.prepareFolders() - if err != nil { - return nil, err + // prepare directories to store index only when it not in-memory mode + if !n.inMem { + if !file.Exists(n.path) { + return nil, errors.ErrIndexPathNotExists(n.path) + } + ctx := context.Background() + err = n.prepareFolders(ctx) + if err != nil { + return nil, err + } } err = n.initNGT( @@ -198,23 +211,88 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) { return n, nil } -func (n *ngt) prepareFolders() (err error) { - if n.enableCopyOnWrite && !n.inMem && len(n.path) != 0 { - sep := string(os.PathSeparator) - absPath, err := filepath.Abs(strings.ReplaceAll(n.path, sep+sep, sep)) - if err != nil { - log.Warnf("keep going with relative path:\t%v", err) - } else { - n.path = absPath +// migrate migrates the index directory from old to new under the input path if necessary. +// Migration happens when the path is not empty and there is no `path/origin` directory, +// which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet. +func migrate(ctx context.Context, path string) (err error) { + // check if migration is required + files, err := file.ListInDir(path) + if err != nil { + return err + } + if len(files) == 0 { + // empty directory doesn't need migration + return nil + } + od := filepath.Join(path, originIndexDirName) + for _, file := range files { + if file == od { + // origin folder exists. meaning already migrated + return nil } - n.basePath = n.path - n.oldPath = file.Join(n.basePath, oldIndexDirName) - n.path = file.Join(n.basePath, originIndexDirName) - err = file.MkdirAll(n.oldPath, fs.ModePerm) + } + + // at this point, there is something in the path, but there is no `path/origin`, which means migration is required + // so create origin and move all contents in path to `path/origin` + + // first move all contents to temporary directory because it's not possible to directly move directory to its subdirectory + tp, err := file.MkdirTemp("") + if err != nil { + return err + } + err = file.MoveDir(ctx, path, tp) + if err != nil { + return err + } + + // recreate the path again to move contents to `path/origin` lately + err = file.MkdirAll(path, fs.ModePerm) + if err != nil { + return err + } + + // finally move to `path/origin` directory + err = file.MoveDir(ctx, tp, file.Join(path, originIndexDirName)) + if err != nil { + return err + } + + return nil +} + +func (n *ngt) prepareFolders(ctx context.Context) (err error) { + // migrate from old index directory to new index directory if necessary + if !n.enableCopyOnWrite { + err = migrate(ctx, n.path) if err != nil { - log.Warn(err) + return err } - err = file.MkdirAll(n.path, fs.ModePerm) + } + + // set paths + sep := string(os.PathSeparator) + n.path, err = filepath.Abs(strings.ReplaceAll(n.path, sep+sep, sep)) + if err != nil { + log.Warn(err) + } + n.basePath = n.path + n.oldPath = file.Join(n.basePath, oldIndexDirName) + n.path = file.Join(n.basePath, originIndexDirName) + n.brokenPath = file.Join(n.basePath, brokenIndexDirName) + + // initialize origin and broken index backup directory + // the path does not differ if it's CoW mode or not + err = file.MkdirAll(n.path, fs.ModePerm) + if err != nil { + log.Warn(err) + } + err = file.MkdirAll(n.brokenPath, fs.ModePerm) + if err != nil { + log.Warnf("failed to create a folder for broken index backup: %v", err) + } + + if n.enableCopyOnWrite && len(n.path) != 0 { + err = file.MkdirAll(n.oldPath, fs.ModePerm) if err != nil { log.Warn(err) } @@ -389,6 +467,136 @@ func (n *ngt) load(ctx context.Context, path string, opts ...core.Option) (err e return nil } +// backupBroken backup index at originPath into brokenDir. +// The name of the directory will be timestamp(UnixNano). +// If it exeeds the limit, backupBroken removes the oldest backup directory. +func backupBroken(ctx context.Context, originPath string, brokenDir string, limit int) error { + if limit <= 0 { + return nil + } + + // do nothing when origin path is empty + files, err := file.ListInDir(originPath) + if err != nil { + return err + } + if len(files) == 0 { + return nil + } + + // how many generation exists in the path? + files, err = file.ListInDir(brokenDir) + if err != nil { + return err + } + + if len(files) >= limit { + // remove the oldest + log.Infof("There's already more than %v broken index generations stored. Thus removing the oldest.", limit) + slices.Sort(files) + if err := os.RemoveAll(files[0]); err != nil { + return err + } + } + + // create directory for new generation broken index + name := time.Now().UnixNano() + dest := filepath.Join(brokenDir, fmt.Sprint(name)) + + // move index to the new directory + err = file.MoveDir(ctx, originPath, dest) + if err != nil { + return err + } + return nil +} + +// needsBackup checks if the backup is needed. +func needsBackup(backupPath string) bool { + // Initial state where there's only grp, obj, prf, tre -> false + files, err := file.ListInDir(backupPath) + if err != nil { + return false + } + // Check if there's *.json or *.kvsdb + initialState := true + for _, f := range files { + if strings.HasSuffix(f, ".json") || strings.HasSuffix(f, ".kvsdb") { + initialState = false + break + } + } + if initialState { + return false + } + + // Not initial state but no metadata.json exists -> true + hasMetadataJSON := false + for _, f := range files { + if filepath.Base(f) == metadata.AgentMetadataFileName { + hasMetadataJSON = true + break + } + } + if !hasMetadataJSON { + return true + } + + // Now check the metadata.json to see if backup is required + metadataPath := filepath.Join(backupPath, metadata.AgentMetadataFileName) + meta, err := metadata.Load(metadataPath) + if err != nil { + return false + } + + if meta.IsInvalid || meta.NGT.IndexCount > 0 { + return true + } + + return false +} + +// rebuild rebuilds the index directory with a clean state. When it is required, it moves the current broken index +// to the broken directory until it reaches the limit. If the limit is reached, it removes the oldest. +// the `path` input is the path to rebuild the index directory. It is identical to n.path when CoW is disabled +// and is a temporal path when CoW is enabled. +func (n *ngt) rebuild(ctx context.Context, path string, opts ...core.Option) (err error) { + // backup when it is required + if needsBackup(n.path) { + log.Infof("starting to backup broken index at %v", n.path) + err = backupBroken(ctx, n.path, n.brokenPath, n.historyLimit) + if err != nil { + log.Warnf("failed to backup broken index. will remove it and restart: %v", err) + } else { + // remake the path since it has been moved to broken directory + err = file.MkdirAll(n.path, fs.ModePerm) + if err != nil { + return fmt.Errorf("failed to recreate the index directory: %w", err) + } + } + } + + // remove the index directory and restart with a clean state + files, err := file.ListInDir(path) + if err == nil && len(files) != 0 { + log.Warnf("index path exists, will remove the directories: %v", files) + for _, f := range files { + err = os.RemoveAll(f) + if err != nil { + return err + } + } + } else if err != nil { + log.Debug(err) + } + + n.core, err = core.New(append(opts, core.WithIndexPath(path))...) + if err != nil { + return fmt.Errorf("failed to create new core: %w", err) + } + return nil +} + func (n *ngt) initNGT(opts ...core.Option) (err error) { if n.kvs == nil { n.kvs = kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency)) @@ -415,8 +623,7 @@ func (n *ngt) initNGT(opts ...core.Option) (err error) { n.core.Close() n.core = nil } - n.core, err = core.New(append(opts, core.WithIndexPath(n.path))...) - return err + return n.rebuild(ctx, n.path, opts...) } if errors.Is(err, errors.ErrIndicesAreTooFewComparedToMetadata) && n.kvs != nil { current = n.kvs.Len() @@ -471,7 +678,7 @@ func (n *ngt) initNGT(opts ...core.Option) (err error) { n.core.Close() n.core = nil } - n.core, err = core.New(append(opts, core.WithIndexPath(tpath))...) + err = n.rebuild(ctx, tpath, opts...) if err != nil { return err } diff --git a/pkg/agent/core/ngt/service/ngt_test.go b/pkg/agent/core/ngt/service/ngt_test.go index 0596a45166..c6de113167 100644 --- a/pkg/agent/core/ngt/service/ngt_test.go +++ b/pkg/agent/core/ngt/service/ngt_test.go @@ -20,8 +20,12 @@ package service import ( "context" "fmt" + "io/fs" "math" + "os" + "path/filepath" "reflect" + "strings" "sync" "sync/atomic" "testing" @@ -37,11 +41,13 @@ import ( "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/safety" + testdata "github.com/vdaas/vald/internal/test" "github.com/vdaas/vald/internal/test/data/vector" "github.com/vdaas/vald/internal/test/goleak" "github.com/vdaas/vald/pkg/agent/core/ngt/model" "github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs" "github.com/vdaas/vald/pkg/agent/core/ngt/service/vqueue" + "github.com/vdaas/vald/pkg/agent/internal/metadata" "google.golang.org/grpc" ) @@ -72,25 +78,193 @@ func TestNew(t *testing.T) { } return nil } + defaultConfig := config.NGT{ + Dimension: 100, + DistanceType: "l2", + ObjectType: "float", + BulkInsertChunkSize: 10, + CreationEdgeSize: 20, + SearchEdgeSize: 10, + EnableProactiveGC: false, + EnableCopyOnWrite: false, + KVSDB: &config.KVSDB{ + Concurrency: 10, + }, + BrokenIndexHistoryLimit: 1, + } tests := []test{ func() test { tmpDir := t.TempDir() + brokenDir := filepath.Join(tmpDir, brokenIndexDirName) + return test{ + name: "New creates `origin` and `broken` directory with default options", + args: args{ + cfg: &defaultConfig, + opts: []Option{ + WithIndexPath(tmpDir), + }, + }, + want: want{ + err: nil, + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + dirs, err := file.ListInDir(tmpDir) + if err != nil { + return err + } + + // extract folder name from dir path into a map + dirSet := make(map[string]struct{}, len(dirs)) + for _, dir := range dirs { + // extract folder name from dir path + dir = dir[len(tmpDir)+1:] + dirSet[dir] = struct{}{} + } + + // check if the dirs set contains folder names origin, backup and broken. + if _, ok := dirSet[originIndexDirName]; !ok { + return fmt.Errorf("failed to create origin dir") + } + if _, ok := dirSet[brokenIndexDirName]; !ok { + return fmt.Errorf("failed to create broken dir") + } + + // check if the broken index directory is empty + files, err := file.ListInDir(brokenDir) + if err != nil { + return err + } + if len(files) != 0 { + return fmt.Errorf("broken index directory is not empty") + } + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + originDir := filepath.Join(tmpDir, originIndexDirName) + testIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + return test{ + name: "New migrates index files into `origin`", + args: args{ + cfg: &defaultConfig, + opts: []Option{ + WithIndexPath(tmpDir), + }, + }, + want: want{ + err: nil, + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + // copy testdata index files into tmpDir which is a old index directory + // this should be moved to origin directory by the migration process + file.CopyDir(context.Background(), testIndexDir, tmpDir) + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + files, err := file.ListInDir(tmpDir) + if err != nil { + return err + } + + // extract folder name from dir path into a map + dirSet := make(map[string]struct{}, len(files)) + for _, dir := range files { + // extract folder name from dir path + dirSet[filepath.Base(dir)] = struct{}{} + } + + // check if the dirs set contains folder names origin, backup and broken. + if _, ok := dirSet[originIndexDirName]; !ok { + return fmt.Errorf("failed to create origin dir") + } + if _, ok := dirSet[brokenIndexDirName]; !ok { + return fmt.Errorf("failed to create broken dir") + } + + // check if the origin index directory has index files + files, err = file.ListInDir(originDir) + if err != nil { + return err + } + if len(files) == 0 { + return fmt.Errorf("migration failed to move index files") + } + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + originDir := filepath.Join(tmpDir, originIndexDirName) + testIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) return test{ - name: "success with default options", + name: "New migrates does not migrate index files if origin directory already exists", args: args{ - cfg: &config.NGT{ - Dimension: 100, - DistanceType: "l2", - ObjectType: "float", - BulkInsertChunkSize: 10, - CreationEdgeSize: 20, - SearchEdgeSize: 10, - EnableProactiveGC: false, - EnableCopyOnWrite: false, - KVSDB: &config.KVSDB{ - Concurrency: 10, - }, + cfg: &defaultConfig, + opts: []Option{ + WithIndexPath(tmpDir), }, + }, + want: want{ + err: nil, + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + // copy testdata index files into tmpDir which is a old index directory + err := file.CopyDir(context.Background(), testIndexDir, tmpDir) + if err != nil { + t.Errorf("failed to copy testdata index files: %v", err) + } + + // copy testdata index files into tmpDir which is a old index directory + err = file.MkdirAll(originDir, fs.ModePerm) + if err != nil { + t.Errorf("failed to create origin directory: %v", err) + } + err = file.CopyDir(context.Background(), testIndexDir, originDir) + if err != nil { + t.Errorf("failed to copy testdata index files: %v", err) + } + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + files, err := file.ListInDir(tmpDir) + if err != nil { + return err + } + + metadataExists := false + for _, file := range files { + if filepath.Base(file) == "metadata.json" { + metadataExists = true + } + } + if !metadataExists { + return fmt.Errorf("migration should not happen") + } + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + brokenDir := filepath.Join(tmpDir, brokenIndexDirName) + config := defaultConfig + config.EnableCopyOnWrite = true + return test{ + name: "New creates `origin`, `backup` and `broken` directory with CoW enabled", + args: args{ + cfg: &config, opts: []Option{ WithIndexPath(tmpDir), }, @@ -98,7 +272,215 @@ func TestNew(t *testing.T) { want: want{ err: nil, }, - checkFunc: defaultCheckFunc, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + dirs, err := file.ListInDir(tmpDir) + if err != nil { + return err + } + + // extract folder name from dir path into a map + dirSet := make(map[string]struct{}, len(dirs)) + for _, dir := range dirs { + // extract folder name from dir path + dir = dir[len(tmpDir)+1:] + dirSet[dir] = struct{}{} + } + + // check if the dirs set contains folder names origin, backup and broken. + if _, ok := dirSet[originIndexDirName]; !ok { + return fmt.Errorf("failed to create origin dir") + } + if _, ok := dirSet[oldIndexDirName]; !ok { + return fmt.Errorf("failed to create backup dir") + } + if _, ok := dirSet[brokenIndexDirName]; !ok { + return fmt.Errorf("failed to create broken dir") + } + + // check if the broken index directory is empty + files, err := file.ListInDir(brokenDir) + if err != nil { + return err + } + if len(files) != 0 { + return fmt.Errorf("broken index directory is not empty") + } + + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + originDir := filepath.Join(tmpDir, originIndexDirName) + brokenDir := filepath.Join(tmpDir, brokenIndexDirName) + testIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + config := defaultConfig + config.BrokenIndexHistoryLimit = 1 + return test{ + name: "New succeeds to backup broken index", + args: args{ + cfg: &config, + opts: []Option{ + WithIndexPath(tmpDir), + }, + }, + want: want{ + err: nil, + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + if err := file.MkdirAll(originDir, fs.ModePerm); err != nil { + t.Errorf("failed to create origin dir: %v", err) + } + file.CopyDir(context.Background(), testIndexDir, originDir) + // remove metadata.json to make it broken + if err := os.Remove(filepath.Join(originDir, "metadata.json")); err != nil { + t.Errorf("failed to remove index file: %v", err) + } + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + files, err := file.ListInDir(brokenDir) + if err != nil { + return err + } + if len(files) != 1 { + return fmt.Errorf("only one generation should be in broken dir") + } + + broken, err := file.ListInDir(files[0]) + if err != nil { + return err + } + if len(broken) == 0 { + return fmt.Errorf("failed to move broken index files") + } + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + originDir := filepath.Join(tmpDir, originIndexDirName) + brokenDir := filepath.Join(tmpDir, brokenIndexDirName) + testIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + config := defaultConfig + config.BrokenIndexHistoryLimit = 1 + return test{ + name: "New succeeds to rotate broken index backup when the number of generations exceeds the limit", + args: args{ + cfg: &config, + opts: []Option{ + WithIndexPath(tmpDir), + }, + }, + want: want{ + err: nil, + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + if err := file.MkdirAll(originDir, fs.ModePerm); err != nil { + t.Errorf("failed to create origin dir: %v", err) + } + file.CopyDir(context.Background(), testIndexDir, originDir) + // remove metadata.json to make it broken + if err := os.Remove(filepath.Join(originDir, "metadata.json")); err != nil { + t.Errorf("failed to remove index file: %v", err) + } + + if err := file.MkdirAll(brokenDir, fs.ModePerm); err != nil { + t.Errorf("failed to create broken dir: %v", err) + } + gen1 := filepath.Join(brokenDir, fmt.Sprint(time.Now().UnixNano())) + if err := file.MkdirAll(gen1, fs.ModePerm); err != nil { + t.Errorf("failed to create gen1 dir: %v", err) + } + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + files, err := file.ListInDir(brokenDir) + if err != nil { + return err + } + if len(files) != 1 { + return fmt.Errorf("only one generation should be in broken dir") + } + + broken, err := file.ListInDir(files[0]) + if err != nil { + return err + } + if len(broken) == 0 { + return fmt.Errorf("failed to move broken index files") + } + return nil + }, + } + }(), + func() test { + tmpDir := t.TempDir() + originDir := filepath.Join(tmpDir, originIndexDirName) + brokenDir := filepath.Join(tmpDir, brokenIndexDirName) + testIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + config := defaultConfig + config.BrokenIndexHistoryLimit = 0 + return test{ + name: "New does not backup when history limit is 0", + args: args{ + cfg: &config, + opts: []Option{ + WithIndexPath(tmpDir), + }, + }, + want: want{ + err: nil, + }, + beforeFunc: func(t *testing.T, args args) { + t.Helper() + if err := file.MkdirAll(originDir, fs.ModePerm); err != nil { + t.Errorf("failed to create origin dir: %v", err) + } + file.CopyDir(context.Background(), testIndexDir, originDir) + // remove metadata.json to make it broken + if err := os.Remove(filepath.Join(originDir, "metadata.json")); err != nil { + t.Errorf("failed to remove index file: %v", err) + } + }, + checkFunc: func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + files, err := file.ListInDir(brokenDir) + if err != nil { + return err + } + if len(files) != 0 { + return fmt.Errorf("backup should not happen") + } + return nil + }, + } + }(), + func() test { + return test{ + name: "New fails with not existing index path", + args: args{ + cfg: &defaultConfig, + opts: []Option{ + WithIndexPath("/dev/null/ghost"), + }, + }, + want: want{ + err: errors.ErrIndexPathNotExists("/dev/null/ghost"), + }, } }(), } @@ -127,70 +509,175 @@ func TestNew(t *testing.T) { } } -func Test_ngt_prepareFolders(t *testing.T) { - type args struct{} - type want struct { - err error +func Test_needsBackup(t *testing.T) { + type args struct { + path string } - type fields struct { - enableCopyOnWrite bool - path string + type want struct { + need bool } type test struct { name string args args - fields fields want want - checkFunc func(want, error) error + checkFunc func(want, bool) error beforeFunc func(*testing.T, args) afterFunc func(*testing.T, args) } - defaultCheckFunc := func(w want, err error) error { - if !errors.Is(err, w.err) { - return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + defaultCheckFunc := func(w want, need bool) error { + if need != w.need { + return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", need, w.need) } return nil } - tests := []test{ func() test { tmpDir := t.TempDir() + validIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) return test{ - name: "success to create origin and backup dir", - args: args{}, - fields: fields{ - enableCopyOnWrite: true, - path: tmpDir, + name: "returns false when it's an initaial state", + args: args{ + path: tmpDir, }, want: want{ - err: nil, + need: false, }, - checkFunc: func(w want, err error) error { - if !errors.Is(err, w.err) { - return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + beforeFunc: func(t *testing.T, a args) { + t.Helper() + if err := file.CopyDir(context.Background(), validIndexDir, tmpDir); err != nil { + t.Errorf("failed to copy index files: %v", err) } - dirs, err := file.ListInDir(tmpDir) + + // remove .json and .kvsdb files to simulate an initial state + files, err := file.ListInDir(tmpDir) if err != nil { - return err + t.Errorf("failed to list index files: %v", err) + } + for _, file := range files { + if strings.HasSuffix(file, ".json") || strings.HasSuffix(file, ".kvsdb") { + if err := os.Remove(file); err != nil { + t.Errorf("failed to remove index file: %v", err) + } + } + } + }, + } + }(), + func() test { + tmpDir := t.TempDir() + validIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + return test{ + name: "returns true when there's index files but no metadata.json", + args: args{ + path: tmpDir, + }, + want: want{ + need: true, + }, + beforeFunc: func(t *testing.T, a args) { + t.Helper() + if err := file.CopyDir(context.Background(), validIndexDir, tmpDir); err != nil { + t.Errorf("failed to copy index files: %v", err) } - // extract folder name from dir path into a map - dirMap := make(map[string]struct{}, len(dirs)) - for _, dir := range dirs { - // extract folder name from dir path - dir = dir[len(tmpDir)+1:] - dirMap[dir] = struct{}{} + // remove metadata.json + metafile := filepath.Join(tmpDir, "metadata.json") + if err := os.Remove(metafile); err != nil { + t.Errorf("failed to remove metadata.json: %v", err) + } + }, + } + }(), + func() test { + tmpDir := t.TempDir() + validIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + return test{ + name: "returns true when mets.IsInvalid is true", + args: args{ + path: tmpDir, + }, + want: want{ + need: true, + }, + beforeFunc: func(t *testing.T, a args) { + t.Helper() + if err := file.CopyDir(context.Background(), validIndexDir, tmpDir); err != nil { + t.Errorf("failed to copy index files: %v", err) } - // check if the dirs slice contains origin or backup. - // if the dirs slice contains both origin and backup, it is an error. - if _, ok := dirMap[originIndexDirName]; !ok { - return fmt.Errorf("failed to create origin dir") + // change IsInvalid in metadata.json + metafile := filepath.Join(tmpDir, "metadata.json") + meta, err := metadata.Load(metafile) + if err != nil { + t.Errorf("failed to load metadata.json: %v", err) } - if _, ok := dirMap[oldIndexDirName]; !ok { - return fmt.Errorf("failed to create backup dir") + meta.IsInvalid = true + meta.NGT.IndexCount = 0 + if err := metadata.Store(metafile, meta); err != nil { + t.Errorf("failed to store metadata.json: %v", err) + } + }, + } + }(), + func() test { + tmpDir := t.TempDir() + validIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + return test{ + name: "returns true when mets.IsInvalid is true", + args: args{ + path: tmpDir, + }, + want: want{ + need: true, + }, + beforeFunc: func(t *testing.T, a args) { + t.Helper() + if err := file.CopyDir(context.Background(), validIndexDir, tmpDir); err != nil { + t.Errorf("failed to copy index files: %v", err) + } + + // change NGT.IndexCount in metadata.json + metafile := filepath.Join(tmpDir, "metadata.json") + meta, err := metadata.Load(metafile) + if err != nil { + t.Errorf("failed to load metadata.json: %v", err) + } + meta.IsInvalid = false + meta.NGT.IndexCount = 100 + if err := metadata.Store(metafile, meta); err != nil { + t.Errorf("failed to store metadata.json: %v", err) + } + }, + } + }(), + func() test { + tmpDir := t.TempDir() + validIndexDir := testdata.GetTestdataPath(testdata.ValidIndex) + return test{ + name: "returns false when NGT.IndexCount is 0", + args: args{ + path: tmpDir, + }, + want: want{ + need: false, + }, + beforeFunc: func(t *testing.T, a args) { + t.Helper() + if err := file.CopyDir(context.Background(), validIndexDir, tmpDir); err != nil { + t.Errorf("failed to copy index files: %v", err) + } + + // change NGT.IndexCount in metadata.json + metafile := filepath.Join(tmpDir, "metadata.json") + meta, err := metadata.Load(metafile) + if err != nil { + t.Errorf("failed to load metadata.json: %v", err) + } + meta.IsInvalid = false + meta.NGT.IndexCount = 0 + if err := metadata.Store(metafile, meta); err != nil { + t.Errorf("failed to store metadata.json: %v", err) } - return nil }, } }(), @@ -210,12 +697,8 @@ func Test_ngt_prepareFolders(t *testing.T) { if test.checkFunc == nil { checkFunc = defaultCheckFunc } - n := &ngt{ - enableCopyOnWrite: test.fields.enableCopyOnWrite, - path: test.fields.path, - } - err := n.prepareFolders() - if err := checkFunc(test.want, err); err != nil { + need := needsBackup(test.args.path) + if err := checkFunc(test.want, need); err != nil { tt.Errorf("error = %v", err) } })