Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove vcache for vald agent due to vcache delete timing control failure and time ordered concurrent vector queue called vqueue #1028

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/vald/values.schema.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,20 @@ agent:
# @schema {"name": "agent.ngt.enable_proactive_gc", "type": "boolean"}
# agent.ngt.enable_proactive_gc -- enable proactive GC call for reducing heap memory allocation
enable_proactive_gc: true
# @schema {"name": "agent.ngt.vqueue", "type": "object"}
vqueue:
# @schema {"name": "agent.ngt.vqueue.insert_buffer_size", "type": "integer"}
# agent.ngt.vqueue.insert_buffer_size -- insert channel buffer size
insert_buffer_size: 100
# @schema {"name": "agent.ngt.vqueue.delete_buffer_size", "type": "integer"}
# agent.ngt.vqueue.delete_buffer_size -- delete channel buffer size
delete_buffer_size: 100
# @schema {"name": "agent.ngt.vqueue.insert_buffer_pool_size", "type": "integer"}
# agent.ngt.vqueue.insert_buffer_pool_size -- insert slice pool buffer size
insert_buffer_pool_size: 10000
# @schema {"name": "agent.ngt.vqueue.delete_buffer_pool_size", "type": "integer"}
# agent.ngt.vqueue.delete_buffer_pool_size -- delete slice pool buffer size
delete_buffer_pool_size: 5000
# @schema {"name": "agent.sidecar", "type": "object"}
sidecar:
# @schema {"name": "agent.sidecar.enabled", "type": "boolean"}
Expand Down
4 changes: 2 additions & 2 deletions cmd/agent/core/ngt/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ ngt:
bulk_insert_chunk_size: 10
creation_edge_size: 20
default_epsilon: 0.01
default_pool_size: 10000
default_pool_size: 100
default_radius: -1
dimension: 6
dimension: 100
distance_type: l2
enable_in_memory_mode: true
enable_proactive_gc: true
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ replace (
github.com/docker/docker => github.com/moby/moby v20.10.4+incompatible
github.com/envoyproxy/protoc-gen-validate => github.com/envoyproxy/protoc-gen-validate v0.4.1
github.com/go-sql-driver/mysql => github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20210129204804-4364a4b9cfdd
github.com/gocql/gocql => github.com/gocql/gocql v0.0.0-20210301062520-a04dba85ed25
github.com/gogo/googleapis => github.com/gogo/googleapis v1.4.0
github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp => github.com/google/go-cmp v0.5.4
Expand All @@ -21,7 +21,7 @@ replace (
github.com/gophercloud/gophercloud => github.com/gophercloud/gophercloud v0.16.0
github.com/gorilla/websocket => github.com/gorilla/websocket v1.4.2
github.com/hailocab/go-hostpool => github.com/monzo/go-hostpool v0.0.0-20200724120130-287edbb29340
github.com/klauspost/compress => github.com/klauspost/compress v1.11.9-0.20210226154345-6381575b273f
github.com/klauspost/compress => github.com/klauspost/compress v1.11.9
github.com/kpango/glg => github.com/kpango/glg v1.5.1
github.com/tensorflow/tensorflow => github.com/tensorflow/tensorflow v2.1.2+incompatible
golang.org/x/crypto => golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
Expand Down Expand Up @@ -76,7 +76,7 @@ require (
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210227040730-b0d1d43c014d
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b
gonum.org/v1/hdf5 v0.0.0-20200504100616-496fefe91614
gonum.org/v1/plot v0.8.1
google.golang.org/api v0.40.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ github.com/go-redis/redis/v8 v8.6.0/go.mod h1:DQ9q4Rk2HtwkrwVrdgmphoOQDMfpvcd/nH
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocql/gocql v0.0.0-20210129204804-4364a4b9cfdd h1:j0VZcDIpU6sLBMNFUEvuwP4FdmEXbmaEq6qufApwbbo=
github.com/gocql/gocql v0.0.0-20210129204804-4364a4b9cfdd/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocql/gocql v0.0.0-20210301062520-a04dba85ed25 h1:RqyFvP1RF5WLMGn9VVCSTPzg15b7jvFwJeWmWujSA6Q=
github.com/gocql/gocql v0.0.0-20210301062520-a04dba85ed25/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gocraft/dbr/v2 v2.7.1 h1:v/bqxEBTlzj9sDbFCmtO2K1M3foDnYtQn097VoMALCw=
github.com/gocraft/dbr/v2 v2.7.1/go.mod h1:+q/T5TMzrc6BWFbcSzj7ixaaBJkbDfRsy9gq7eMsU1s=
github.com/gogo/googleapis v1.4.0 h1:zgVt4UpGxcqVOw97aRGxT4svlcmdK35fynLNctY32zI=
Expand Down Expand Up @@ -295,8 +295,8 @@ github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5Pt
github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.9-0.20210226154345-6381575b273f h1:Ci0TKv+M1ZYVcpHxft0R0GR2RQ6Y6v32JJ4FKmkKo38=
github.com/klauspost/compress v1.11.9-0.20210226154345-6381575b273f/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.9 h1:5OCMOdde1TCT2sookEuVeEZzA8bmRSFV3AwPDZAG8AA=
github.com/klauspost/compress v1.11.9/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -686,8 +686,8 @@ golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210227040730-b0d1d43c014d h1:9fH9JvLNoSpsDWcXJ4dSE3lZW99Z3OCUZLr07g60U6o=
golang.org/x/sys v0.0.0-20210227040730-b0d1d43c014d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b h1:kHlr0tATeLRMEiZJu5CknOw/E8V6h69sXXQFGoPtjcc=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
4 changes: 2 additions & 2 deletions internal/config/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestDialer_Bind(t *testing.T) {
}
}

func TestTCP_Bind(t *testing.T) {
func TestNet_Bind(t *testing.T) {
type fields struct {
DNS *DNS
Dialer *Dialer
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestTCP_Bind(t *testing.T) {
}
}

func TestTCP_Opts(t *testing.T) {
func TestNet_Opts(t *testing.T) {
type fields struct {
DNS *DNS
Dialer *Dialer
Expand Down
61 changes: 41 additions & 20 deletions internal/config/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,82 @@ package config
// NGT represent the ngt core configuration for server.
type NGT struct {
// IndexPath represent the ngt index file path
IndexPath string `yaml:"index_path" json:"index_path"`
IndexPath string `yaml:"index_path" json:"index_path,omitempty"`

// Dimension represent the ngt index dimension
Dimension int `yaml:"dimension" json:"dimension"`
Dimension int `yaml:"dimension" json:"dimension,omitempty"`

// BulkInsertChunkSize represent the bulk insert chunk size
BulkInsertChunkSize int `yaml:"bulk_insert_chunk_size" json:"bulk_insert_chunk_size"`
BulkInsertChunkSize int `yaml:"bulk_insert_chunk_size" json:"bulk_insert_chunk_size,omitempty"`

// DistanceType represent the ngt index distance type
DistanceType string `yaml:"distance_type" json:"distance_type"`
DistanceType string `yaml:"distance_type" json:"distance_type,omitempty"`

// ObjectType represent the ngt index object type float or int
ObjectType string `yaml:"object_type" json:"object_type"`
ObjectType string `yaml:"object_type" json:"object_type,omitempty"`

// CreationEdgeSize represent the index edge count
CreationEdgeSize int `yaml:"creation_edge_size" json:"creation_edge_size"`
CreationEdgeSize int `yaml:"creation_edge_size" json:"creation_edge_size,omitempty"`

// SearchEdgeSize represent the search edge size
SearchEdgeSize int `yaml:"search_edge_size" json:"search_edge_size"`
SearchEdgeSize int `yaml:"search_edge_size" json:"search_edge_size,omitempty"`

// AutoIndexDurationLimit represents auto indexing duration limit
AutoIndexDurationLimit string `yaml:"auto_index_duration_limit" json:"auto_index_duration_limit"`
AutoIndexDurationLimit string `yaml:"auto_index_duration_limit" json:"auto_index_duration_limit,omitempty"`

// AutoIndexCheckDuration represent checking loop duration about auto indexing execution
AutoIndexCheckDuration string `yaml:"auto_index_check_duration" json:"auto_index_check_duration"`
AutoIndexCheckDuration string `yaml:"auto_index_check_duration" json:"auto_index_check_duration,omitempty"`

// AutoSaveIndexDuration represent checking loop duration about auto save index execution
AutoSaveIndexDuration string `yaml:"auto_save_index_duration" json:"auto_save_index_duration"`
AutoSaveIndexDuration string `yaml:"auto_save_index_duration" json:"auto_save_index_duration,omitempty"`

// AutoIndexLength represent auto index length limit
AutoIndexLength int `yaml:"auto_index_length" json:"auto_index_length"`
AutoIndexLength int `yaml:"auto_index_length" json:"auto_index_length,omitempty"`

// InitialDelayMaxDuration represent maximum duration for initial delay
InitialDelayMaxDuration string `yaml:"initial_delay_max_duration" json:"initial_delay_max_duration"`
InitialDelayMaxDuration string `yaml:"initial_delay_max_duration" json:"initial_delay_max_duration,omitempty"`

// EnableInMemoryMode enables on memory ngt indexing mode
EnableInMemoryMode bool `yaml:"enable_in_memory_mode" json:"enable_in_memory_mode"`
EnableInMemoryMode bool `yaml:"enable_in_memory_mode" json:"enable_in_memory_mode,omitempty"`

// DefaultPoolSize represent default create index batch pool size
DefaultPoolSize uint32 `yaml:"default_pool_size" json:"default_pool_size"`
DefaultPoolSize uint32 `yaml:"default_pool_size" json:"default_pool_size,omitempty"`

// DefaultRadius represent default radius used for search
DefaultRadius float32 `yaml:"default_radius" json:"default_radius"`
DefaultRadius float32 `yaml:"default_radius" json:"default_radius,omitempty"`

// DefaultEpsilon represent default epsilon used for search
DefaultEpsilon float32 `yaml:"default_epsilon" json:"default_epsilon"`
DefaultEpsilon float32 `yaml:"default_epsilon" json:"default_epsilon,omitempty"`

// MinLoadIndexTimeout represents minimum duration of load index timeout
MinLoadIndexTimeout string `yaml:"min_load_index_timeout" json:"min_load_index_timeout"`
MinLoadIndexTimeout string `yaml:"min_load_index_timeout" json:"min_load_index_timeout,omitempty"`

// MaxLoadIndexTimeout represents maximum duration of load index timeout
MaxLoadIndexTimeout string `yaml:"max_load_index_timeout" json:"max_load_index_timeout"`
MaxLoadIndexTimeout string `yaml:"max_load_index_timeout" json:"max_load_index_timeout,omitempty"`

// LoadIndexTimeoutFactor represents a factor of load index timeout
LoadIndexTimeoutFactor string `yaml:"load_index_timeout_factor" json:"load_index_timeout_factor"`
LoadIndexTimeoutFactor string `yaml:"load_index_timeout_factor" json:"load_index_timeout_factor,omitempty"`

// EnableProactiveGC enables more proactive GC call for reducing heap memory allocation
EnableProactiveGC bool `yaml:"enable_proactive_gc" json:"enable_proactive_gc"`
EnableProactiveGC bool `yaml:"enable_proactive_gc" json:"enable_proactive_gc,omitempty"`

// VQueue represent the ngt vector queue buffer size
VQueue *VQueue `json:"vqueue,omitempty" yaml:"vqueue"`
}

// VQueue represent the ngt vector queue buffer size
type VQueue struct {
// InsertBufferSize represents insert channel buffer size
InsertBufferSize int `json:"insert_buffer_size,omitempty" yaml:"insert_buffer_size"`

// InsertBufferPoolSize represents insert time ordered slice buffer size
InsertBufferPoolSize int `json:"insert_buffer_pool_size,omitempty" yaml:"insert_buffer_pool_size"`

// DeleteBufferSize represents delete channel buffer size
DeleteBufferSize int `json:"delete_buffer_size,omitempty" yaml:"delete_buffer_size"`

// DeleteBufferPoolSize represents delete time ordered slice buffer size
DeleteBufferPoolSize int `json:"delete_buffer_pool_size,omitempty" yaml:"delete_buffer_pool_size"`
}

// Bind returns NGT object whose some string value is filed value or environment value.
Expand All @@ -92,5 +110,8 @@ func (n *NGT) Bind() *NGT {
n.MinLoadIndexTimeout = GetActualValue(n.MinLoadIndexTimeout)
n.MaxLoadIndexTimeout = GetActualValue(n.MaxLoadIndexTimeout)
n.LoadIndexTimeoutFactor = GetActualValue(n.LoadIndexTimeoutFactor)
if n.VQueue == nil {
n.VQueue = new(VQueue)
}
return n
}
9 changes: 8 additions & 1 deletion internal/config/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestNGT_Bind(t *testing.T) {
AutoIndexLength int
InitialDelayMaxDuration string
EnableInMemoryMode bool
VQueue *VQueue
}
type want struct {
want *NGT
Expand Down Expand Up @@ -75,6 +76,7 @@ func TestNGT_Bind(t *testing.T) {
AutoIndexLength: 100,
InitialDelayMaxDuration: "1h",
EnableInMemoryMode: false,
VQueue: new(VQueue),
},
want: want{
want: &NGT{
Expand All @@ -91,6 +93,7 @@ func TestNGT_Bind(t *testing.T) {
AutoIndexLength: 100,
InitialDelayMaxDuration: "1h",
EnableInMemoryMode: false,
VQueue: new(VQueue),
},
},
},
Expand All @@ -110,6 +113,7 @@ func TestNGT_Bind(t *testing.T) {
AutoIndexLength: 100,
InitialDelayMaxDuration: "_initialDelayMaxDuration_",
EnableInMemoryMode: false,
VQueue: new(VQueue),
},
beforeFunc: func() {
_ = os.Setenv("indexPath", "config/ngt")
Expand Down Expand Up @@ -144,13 +148,16 @@ func TestNGT_Bind(t *testing.T) {
AutoIndexLength: 100,
InitialDelayMaxDuration: "1h",
EnableInMemoryMode: false,
VQueue: new(VQueue),
},
},
},
{
name: "returns NGT when all fields are empty",
want: want{
want: new(NGT),
want: &NGT{
VQueue: new(VQueue),
},
},
},
}
Expand Down
14 changes: 6 additions & 8 deletions internal/core/algorithm/ngt/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,14 @@ func (n *ngt) Remove(id uint) error {
}

// BulkRemove removes multiple index from NGT index.
func (n *ngt) BulkRemove(ids ...uint) error {
n.mu.Lock()
defer n.mu.Unlock()
for _, id := range ids {
if C.ngt_remove_index(n.index, C.ObjectID(id), n.ebuf) == ErrorCode {
ne := n.ebuf
return n.newGoError(ne)
func (n *ngt) BulkRemove(ids ...uint) (errs error) {
for i, id := range ids {
err := n.Remove(id)
if err != nil {
errs = errors.Wrapf(errs, "bulkremove error detected index number: %d,\tid: %d\terr: %v", i, id, err)
}
}
return nil
return errs
}

// GetVector returns vector stored in NGT index.
Expand Down
20 changes: 20 additions & 0 deletions internal/errors/vqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Copyright (C) 2019-2021 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

// Package errors provides error types and function
package errors

var ErrVQueueFinalizing = New("error vector queue is now finalizing...")
Loading