Skip to content

Commit

Permalink
Add tests for index information export
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Feb 22, 2024
1 parent 668a5c1 commit a4a7767
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 20 deletions.
12 changes: 8 additions & 4 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,28 @@ func (*client) LabelSelector(key string, op selection.Operator, vals []string) (
return labels.NewSelector().Add(*requirements), nil
}

type Patcher struct {
type Patcher interface {
ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error
}

type patcher struct {
client Client
fieldManager string
}

func NewPatcher(fieldManager string) (Patcher, error) {
client, err := New()
if err != nil {
return Patcher{}, err
return nil, err

Check warning on line 196 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L196

Added line #L196 was not covered by tests
}

return Patcher{
return &patcher{

Check warning on line 199 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L199

Added line #L199 was not covered by tests
client: client,
fieldManager: fieldManager,
}, nil
}

func (s *Patcher) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error {
func (s *patcher) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error {

Check warning on line 205 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L205

Added line #L205 was not covered by tests
var podList corev1.PodList
if err := s.client.List(ctx, &podList, &cli.ListOptions{
Namespace: namespace,
Expand Down
11 changes: 11 additions & 0 deletions internal/test/mock/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,14 @@ func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, val
args := m.Called(key, op, vals)
return args.Get(0).(labels.Selector), args.Error(1)
}

type PatcherMock struct {
mock.Mock
}

var _ client.Patcher = (*PatcherMock)(nil)

func (m *PatcherMock) ApplyPodAnnotations(ctx context.Context, name, namespace string, entries map[string]string) error {
args := m.Called(ctx, name, namespace, entries)
return args.Error(0)
}
17 changes: 17 additions & 0 deletions pkg/agent/core/ngt/service/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

var ExportMetricsOnTick = (*ngt).exportMetricsOnTick
27 changes: 14 additions & 13 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/vdaas/vald/pkg/agent/internal/vqueue"
)

type contextSaveIndexTimeKey string

type NGT interface {
Start(ctx context.Context) <-chan error
Search(ctx context.Context, vec []float32, size uint32, epsilon, radius float32) (*payload.Search_Response, error)
Expand Down Expand Up @@ -161,12 +163,14 @@ const (
originIndexDirName = "origin"
brokenIndexDirName = "broken"

fieldManager = "vald-agent-index-controller"
uncommittedAnnotationsKey = "vald.vdaas.org/uncommitted"
unsavedProcessedVqAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq"
unsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution"
lastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp"
indexCountAnnotationsKey = "vald.vdaas.org/index-count"

// use this only for tests. usually just leave the ctx value empty and let time.Now() be used
saveIndexTimeKey contextSaveIndexTimeKey = "saveIndexTimeKey"
)

func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) {
Expand Down Expand Up @@ -233,14 +237,6 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) {
n.indexing.Store(false)
n.saving.Store(false)

if n.enableExportIndexInfo {
patcher, err := client.NewPatcher(fieldManager)
if err != nil {
return nil, fmt.Errorf("failed to create pacher: %w", err)
}
n.patcher = patcher
}

return n, nil
}

Expand Down Expand Up @@ -1864,9 +1860,8 @@ func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) {
return unsavedCreateIndexExecutionNumAnnotationsKey, strconv.FormatUint(num, 10)
}

func (n *ngt) lastTimeSaveIndexTimestampEntry() (k, v string) {
timestamp := time.Now().UTC().Format(time.RFC3339)
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp
func (n *ngt) lastTimeSaveIndexTimestampEntry(timestamp time.Time) (k, v string) {
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(time.RFC3339)
}

func (n *ngt) indexCountEntry() (k, v string) {
Expand Down Expand Up @@ -1904,7 +1899,13 @@ func (n *ngt) exportMetricsOnCreateIndex(ctx context.Context) error {
func (n *ngt) exportMetricsOnSaveIndex(ctx context.Context) error {
entries := make(map[string]string)

k, v := n.lastTimeSaveIndexTimestampEntry()
val := ctx.Value(saveIndexTimeKey)
t, ok := val.(time.Time)
if !ok {
t = time.Now()
}

Check warning on line 1906 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1905-L1906

Added lines #L1905 - L1906 were not covered by tests

k, v := n.lastTimeSaveIndexTimestampEntry(t)
entries[k] = v

k, v = n.unsavedNumberOfCreateIndexExecutionEntry()
Expand Down
237 changes: 237 additions & 0 deletions pkg/agent/core/ngt/service/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
testdata "github.com/vdaas/vald/internal/test"
"github.com/vdaas/vald/internal/test/data/vector"
"github.com/vdaas/vald/internal/test/goleak"
"github.com/vdaas/vald/internal/test/mock/k8s"
"github.com/vdaas/vald/internal/test/testify"
"github.com/vdaas/vald/pkg/agent/internal/kvs"
"github.com/vdaas/vald/pkg/agent/internal/metadata"
"github.com/vdaas/vald/pkg/agent/internal/vqueue"
Expand Down Expand Up @@ -1126,6 +1128,241 @@ func Test_ngt_Close(t *testing.T) {
}
}

func TestExportIndexInfo(t *testing.T) {
t.Parallel()
config := defaultConfig
config.Dimension = 3
config.EnableExportIndexInfoToK8s = true
config.PodName = "test-pod"

type test struct {
name string
testfunc func(t *testing.T)
}

tests := []test{
{
"export after create index one vector",
func(t *testing.T) {
mock := &k8s.PatcherMock{}
mock.On("ApplyPodAnnotations",
testify.Anything,
testify.Anything,
testify.Anything,
testify.Anything,
).Return(nil)

ngt, err := New(&config, WithPatcher(mock))
require.NoError(t, err)

now := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, now)
require.NoError(t, err)

err = ngt.CreateIndex(context.Background(), 10)
require.NoError(t, err)

// expected entries
expected := map[string]string{
indexCountAnnotationsKey: "1",
uncommittedAnnotationsKey: "0",
unsavedCreateIndexExecutionNumAnnotationsKey: "1",
unsavedProcessedVqAnnotationsKey: "1",
}
// check mock called result
mock.AssertExpectations(t)
mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected)
},
},
{
"export after create index multiple vectors",
func(t *testing.T) {
mock := &k8s.PatcherMock{}
mock.On("ApplyPodAnnotations",
testify.Anything,
testify.Anything,
testify.Anything,
testify.Anything,
).Return(nil)

ngt, err := New(&config, WithPatcher(mock))
require.NoError(t, err)

time1 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1)
require.NoError(t, err)

time2 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2)
require.NoError(t, err)

err = ngt.CreateIndex(context.Background(), 10)
require.NoError(t, err)

// expected entries
expected := map[string]string{
indexCountAnnotationsKey: "2",
uncommittedAnnotationsKey: "0",
unsavedCreateIndexExecutionNumAnnotationsKey: "1",
unsavedProcessedVqAnnotationsKey: "2",
}
// check mock called result
mock.AssertExpectations(t)
mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected)
},
},
{
"export after create index multiple times",
func(t *testing.T) {
mock := &k8s.PatcherMock{}
mock.On("ApplyPodAnnotations",
testify.Anything,
testify.Anything,
testify.Anything,
testify.Anything,
).Return(nil)

ngt, err := New(&config, WithPatcher(mock))
require.NoError(t, err)

time1 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1)
require.NoError(t, err)

err = ngt.CreateIndex(context.Background(), 10)
require.NoError(t, err)

time2 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2)
require.NoError(t, err)

err = ngt.CreateIndex(context.Background(), 10)
require.NoError(t, err)

// expected entries
expected := map[string]string{
indexCountAnnotationsKey: "2",
uncommittedAnnotationsKey: "0",
unsavedCreateIndexExecutionNumAnnotationsKey: "2",
unsavedProcessedVqAnnotationsKey: "2",
}
// check mock called result
mock.AssertExpectations(t)
mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 2)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expected)
},
},
{
"export after create index multiple vectors and save index",
func(t *testing.T) {
mock := &k8s.PatcherMock{}
mock.On("ApplyPodAnnotations",
testify.Anything,
testify.Anything,
testify.Anything,
testify.Anything,
).Return(nil)

tmpdir := t.TempDir()

ngt, err := New(&config,
WithIndexPath(tmpdir),
WithPatcher(mock),
)
require.NoError(t, err)

time1 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1)
require.NoError(t, err)

time2 := time.Now().UnixNano()
err = ngt.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2)
require.NoError(t, err)

ctx := context.Background()
err = ngt.CreateIndex(ctx, 10)
require.NoError(t, err)

// set time in context for testing
saveIndexTime := time.Now()
ctx = context.WithValue(ctx, saveIndexTimeKey, saveIndexTime)

err = ngt.SaveIndex(ctx)
require.NoError(t, err)

// expected entries
expectedAfterCreate := map[string]string{
indexCountAnnotationsKey: "2",
uncommittedAnnotationsKey: "0",
unsavedCreateIndexExecutionNumAnnotationsKey: "1",
unsavedProcessedVqAnnotationsKey: "2",
}
expectedAfterSave := map[string]string{
lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(time.RFC3339),
unsavedCreateIndexExecutionNumAnnotationsKey: "0",
unsavedProcessedVqAnnotationsKey: "0",
}
// check mock called result
mock.AssertExpectations(t)
mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 2)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterCreate)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterSave)
},
},
{
"export after inserting vectors",
func(t *testing.T) {
mock := &k8s.PatcherMock{}
mock.On("ApplyPodAnnotations",
testify.Anything,
testify.Anything,
testify.Anything,
testify.Anything,
).Return(nil)

tmpdir := t.TempDir()

n, err := New(&config,
WithIndexPath(tmpdir),
WithPatcher(mock),
)
require.NoError(t, err)

time1 := time.Now().UnixNano()
err = n.InsertWithTime("test-uuid", []float32{1.0, 2.0, 3.0}, time1)
require.NoError(t, err)

time2 := time.Now().UnixNano()
err = n.InsertWithTime("test-uuid2", []float32{1.0, 2.0, 3.0}, time2)
require.NoError(t, err)

ctx := context.Background()
ExportMetricsOnTick(n.(*ngt), ctx)

// expected entries
expectedAfterInsert := map[string]string{
indexCountAnnotationsKey: "0",
uncommittedAnnotationsKey: "2",
}
// check mock called result
mock.AssertExpectations(t)
mock.AssertNumberOfCalls(t, "ApplyPodAnnotations", 1)
mock.AssertCalled(t, "ApplyPodAnnotations", testify.Anything, config.PodName, config.PodNamespace, expectedAfterInsert)
},
},
}
for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
defer goleak.VerifyNone(tt, goleak.IgnoreCurrent())
test.testfunc(tt)
})
}
}

func Test_ngt_InsertUpsert(t *testing.T) {
if testing.Short() {
t.Skip("The execution of this test takes a lot of time, so it is not performed during the short test\ttest: Test_ngt_InsertUpsert")
Expand Down
Loading

0 comments on commit a4a7767

Please sign in to comment.