Skip to content

Commit

Permalink
[REVERT THIS]
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Feb 5, 2024
1 parent 2a5261c commit fe556b6
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 7 deletions.
6 changes: 6 additions & 0 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,12 @@ agent:
annotations: {}
# @schema {"name": "agent.ngt", "type": "object"}
ngt:
# @schema {"name": "agent.ngt.pod_name", "type": "string"}
# agent.ngt.pod_name -- pod name of myself
pod_name: _MY_POD_NAME_
# @schema {"name": "agent.ngt.namespace", "type": "string"}
# agent.ngt.namespace -- namespace of myself
namespace: _MY_POD_NAMESPACE_
# @schema {"name": "agent.ngt.index_path", "type": "string"}
# agent.ngt.index_path -- path to index data
index_path: ""
Expand Down
2 changes: 2 additions & 0 deletions cmd/agent/core/ngt/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ observability:
service_name: "vald-agent-ngt"
buffer_max_count: 10
ngt:
pod_name: "vald-agent-ngt-0" # this might overwrite k8s resource of agent pod 0
namespace: "default"
auto_create_index_pool_size: 10000
auto_index_check_duration: 30m
auto_index_duration_limit: 24h
Expand Down
8 changes: 8 additions & 0 deletions internal/config/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ package config

// NGT represent the ngt core configuration for server.
type NGT struct {
// PodName represent the ngt pod name
PodName string `yaml:"pod_name" json:"pod_name,omitempty"`

// PodNamespace represent the ngt pod namespace
PodNamespace string `yaml:"namespace" json:"namespace,omitempty"`

// IndexPath represent the ngt index file path
IndexPath string `yaml:"index_path" json:"index_path,omitempty"`

Expand Down Expand Up @@ -116,6 +122,8 @@ type VQueue struct {

// Bind returns NGT object whose some string value is filed value or environment value.
func (n *NGT) Bind() *NGT {
n.PodName = GetActualValue(n.PodName)
n.PodNamespace = GetActualValue(n.PodNamespace)
n.IndexPath = GetActualValue(n.IndexPath)
n.DistanceType = GetActualValue(n.DistanceType)
n.ObjectType = GetActualValue(n.ObjectType)
Expand Down
19 changes: 19 additions & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"fmt"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
cli "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -44,8 +46,12 @@ type (
CreateOptions = cli.CreateOptions
UpdateOptions = cli.UpdateOptions
MatchingLabels = cli.MatchingLabels
MatchingFields = cli.MatchingFields
InNamespace = cli.InNamespace
VolumeSnapshot = snapshotv1.VolumeSnapshot
PodList = corev1.PodList
Pod = corev1.Pod
PatchOptions = cli.PatchOptions
)

const (
Expand All @@ -54,6 +60,11 @@ const (
SelectionOpEquals = selection.Equals
)

var (
ServerSideApply = cli.Apply
PointerBool = pointer.Bool
)

type Client interface {
// Get retrieves an obj for the given object key from the Kubernetes Cluster.
// obj must be a struct pointer so that obj can be updated with the response
Expand All @@ -75,6 +86,10 @@ type Client interface {
// struct pointer so that obj can be updated with the content returned by the Server.
Update(ctx context.Context, obj Object, opts ...cli.UpdateOption) error

// Patch patches the given obj in the Kubernetes cluster. obj must be a
// struct pointer so that obj can be updated with the content returned by the Server.
Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error

// Watch watches the given obj for changes and takes the appropriate callbacks.
Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error)

Expand Down Expand Up @@ -151,6 +166,10 @@ func (c *client) Update(ctx context.Context, obj Object, opts ...cli.UpdateOptio
return c.withWatch.Update(ctx, obj, opts...)
}

func (c *client) Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error {
return c.withWatch.Patch(ctx, obj, patch, opts...)
}

func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) {
return c.withWatch.Watch(ctx, obj, opts...)
}
Expand Down
109 changes: 102 additions & 7 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"reflect"
"runtime"
"slices"
"strconv"
"sync/atomic"
"time"

Expand All @@ -37,6 +38,7 @@ import (
core "github.com/vdaas/vald/internal/core/algorithm/ngt"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
Expand All @@ -46,6 +48,12 @@ import (
"github.com/vdaas/vald/pkg/agent/core/ngt/service/kvs"
"github.com/vdaas/vald/pkg/agent/core/ngt/service/vqueue"
"github.com/vdaas/vald/pkg/agent/internal/metadata"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
kruntime "k8s.io/apimachinery/pkg/runtime"
applycorev1 "k8s.io/client-go/applyconfigurations/core/v1"
)

type NGT interface {
Expand Down Expand Up @@ -122,13 +130,15 @@ type ngt struct {
enableProactiveGC bool // if this value is true, agent component will purge GC memory more proactive
enableCopyOnWrite bool // if this value is true, agent component will write backup file using Copy on Write and saves old files to the old directory

path string // index path
smu sync.Mutex // save index lock
tmpPath atomic.Value // temporary index path for Copy on Write
oldPath string // old volume path
basePath string // index base directory for CoW
brokenPath string // backup broken index path
cowmu sync.Mutex // copy on write move lock
podName string
podNamespace string
path string // index path
smu sync.Mutex // save index lock
tmpPath atomic.Value // temporary index path for Copy on Write
oldPath string // old volume path
basePath string // index base directory for CoW
brokenPath string // backup broken index path
cowmu sync.Mutex // copy on write move lock

poolSize uint32 // default pool size
radius float32 // default radius
Expand All @@ -141,6 +151,7 @@ type ngt struct {
historyLimit int // the maximum generation number of broken index backup

isReadReplica bool
client client.Client
}

const (
Expand All @@ -154,7 +165,12 @@ const (
)

func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) {
if cfg.PodName == "" {
return nil, errors.New("pod_name is empty. this must be set either from environment variable or from config file")
}
n := &ngt{
podName: cfg.PodName,
podNamespace: cfg.PodNamespace,
fmap: make(map[string]int64),
dim: cfg.Dimension,
enableProactiveGC: cfg.EnableProactiveGC,
Expand Down Expand Up @@ -210,6 +226,13 @@ func New(cfg *config.NGT, opts ...Option) (nn NGT, err error) {
}
n.indexing.Store(false)
n.saving.Store(false)

c, err := client.New()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
n.client = c

return n, nil
}

Expand Down Expand Up @@ -830,6 +853,10 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
n.lim = math.MaxInt64
}

// FIXME: delete this
n.idelay = 0

// add initial delay before starting auto indexing
if n.idelay > 0 {
timer := time.NewTimer(n.idelay)
select {
Expand All @@ -844,9 +871,11 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
tick := time.NewTicker(n.dur)
sTick := time.NewTicker(n.sdur)
limit := time.NewTicker(n.lim)
k8sTick := time.NewTicker(10 * time.Second) // FIXME: configからtimer設定
defer tick.Stop()
defer sTick.Stop()
defer limit.Stop()
defer k8sTick.Stop()
for {
err = nil
select {
Expand All @@ -865,6 +894,9 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
err = n.CreateAndSaveIndex(ctx, n.poolSize)
case <-sTick.C:
err = n.SaveIndex(ctx)
case <-k8sTick.C:
log.Debug("k8sTick: flush index info to k8s resource")
err = n.exportMetricsToK8s(ctx)
}
if err != nil && err != errors.ErrUncommittedIndexNotFound {
ech <- err
Expand Down Expand Up @@ -1764,3 +1796,66 @@ func (n *ngt) toSearchResponse(sr []core.SearchResult) (res *payload.Search_Resp
}
return res, nil
}

func (n *ngt) exportMetricsToK8s(ctx context.Context) error {
// FIXME: define as const somewhere
fieldManager := "vald-agent-index-controller"

uncommitted := n.InsertVQueueBufferLen() + n.DeleteVQueueBufferLen()
log.Debugf("exporting metrics to k8s resource: uncommitted: %d", uncommitted)

var podList client.PodList
if err := n.client.List(ctx, &podList, &client.ListOptions{
Namespace: n.podNamespace,
FieldSelector: fields.OneTermEqualSelector("metadata.name", n.podName),
}); err != nil {
return err
}

if len(podList.Items) == 0 {
return errors.New("agent pod not found on exporting metrics")
}

if len(podList.Items) >= 2 {
return errors.New("multiple agent pods found on exporting metrics. pods with same name exist in the same namespace?")
}

// try server side apply
expectPod := applycorev1.Pod(n.podName, n.podNamespace).
WithAnnotations(n.metricsAnnotations())

obj, err := kruntime.DefaultUnstructuredConverter.ToUnstructured(expectPod)
if err != nil {
return err
}

pod := podList.Items[0]
log.Debugf("found pod: %s", pod.GetName())

curApplyConfig, err := applycorev1.ExtractPod(&pod, fieldManager)
if err != nil {
return err
}

if equality.Semantic.DeepEqual(expectPod, curApplyConfig) {
log.Debug("no change in pod spec")
return nil
}

patch := &unstructured.Unstructured{Object: obj}
if err := n.client.Patch(ctx, patch, client.ServerSideApply, &client.PatchOptions{
FieldManager: fieldManager,
Force: client.PointerBool(true),
}); err != nil {
return err
}

return nil
}

func (n *ngt) metricsAnnotations() map[string]string {
entries := make(map[string]string, 0)
entries["vald.agent.index.uncommitted"] = strconv.FormatUint(n.InsertVQueueBufferLen()+n.DeleteVQueueBufferLen(), 10)

return entries
}

0 comments on commit fe556b6

Please sign in to comment.