Skip to content

Commit

Permalink
Added snapshot timestamp annotations to read replica agent (#2428)
Browse files Browse the repository at this point in the history
* Add lastTimeRotateReplicaTimestampAnnotationsKey constant

* Use ptr package

* Update annotations in ngt.go and rotator.go

* Add labels to Pod struct and update annotations handling***

***Introduce vald package for k8s related constants***

***Implement rotateIfNeeded function to check if read replica rotation is needed

* Refactor

* set sample log level debug

* Use k8s timestamp

* Refactor

* Format

* Add MatchingLabels method to ValdK8sClientMock

* Format

* Fix test

* style: format code with Gofumpt and Prettier

This commit fixes the style issues introduced in 39b66fa according to the output
from Gofumpt and Prettier.

Details: #2428

* Rename constants

Co-authored-by: Yusuke Kato <[email protected]>

* Refactor rotateIfNeeded function to use label selector for listing deployments

* style: format code with Gofumpt and Prettier

This commit fixes the style issues introduced in 6ace761 according to the output
from Gofumpt and Prettier.

Details: #2428

* FIx deepsource

* Revert

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
Co-authored-by: Yusuke Kato <[email protected]>
  • Loading branch information
3 people authored and vdaas-ci committed Mar 8, 2024
1 parent 12e79eb commit 47a4565
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 22 deletions.
2 changes: 2 additions & 0 deletions charts/vald/templates/index/operator/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ data:
agent_name: {{ $agent.name }}
agent_namespace: {{ $agent.namespace }}
concurrency: 1
read_replica_enabled: {{ $agent.readreplica.enabled }}
read_replica_label_key: {{ $agent.readreplica.label_key }}
{{- end }}
2 changes: 1 addition & 1 deletion cmd/index/job/readreplica/rotate/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ version: v0.0.0
time_zone: JST
logging:
format: raw
level: info
level: debug
logger: glg
server_config:
servers:
Expand Down
2 changes: 2 additions & 0 deletions cmd/index/operator/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ operator:
agent_name: "vald-agent"
agent_namespace: "default"
concurrency: 1
read_replica_enabled: true
read_replica_label_key: "vald-readreplica-id"
observability:
enabled: false
otlp:
Expand Down
8 changes: 7 additions & 1 deletion internal/config/index_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.
package config

// IndexCreation represents the configurations for index creation.
// IndexOperator represents the configurations for index k8s operator.
type IndexOperator struct {
// AgentName represent agents meta_name for service discovery
AgentName string `json:"agent_name" yaml:"agent_name"`
Expand All @@ -23,6 +23,12 @@ type IndexOperator struct {

// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

// ReadReplicaEnabled represents whether read replica is enabled or not.
ReadReplicaEnabled bool `json:"read_replica_enabled" yaml:"read_replica_enabled"`

// ReadReplicaLabelKey represents the label key for read replica.
ReadReplicaLabelKey string `json:"read_replica_label_key" yaml:"read_replica_label_key"`
}

func (ic *IndexOperator) Bind() *IndexOperator {
Expand Down
16 changes: 10 additions & 6 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
WatchDeletedEvent = watch.Deleted
SelectionOpEquals = selection.Equals
SelectionOpExists = selection.Exists
PodIndexLabel = appsv1.PodIndexLabel
)

var (
Expand Down Expand Up @@ -99,6 +100,9 @@ type Client interface {
// Watch watches the given obj for changes and takes the appropriate callbacks.
Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error)

// MatchingLabels filters the list/delete operation on the given set of labels.
MatchingLabels(labels map[string]string) cli.MatchingLabels

// LabelSelector creates labels.Selector for Options like ListOptions.
LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error)
}
Expand Down Expand Up @@ -173,6 +177,10 @@ func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOpti
return c.withWatch.Watch(ctx, obj, opts...)
}

func (*client) MatchingLabels(labels map[string]string) cli.MatchingLabels {
return cli.MatchingLabels(labels)

Check warning on line 181 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}

func (*client) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) {
requirements, err := labels.NewRequirement(key, op, vals)
if err != nil {
Expand Down Expand Up @@ -251,12 +259,8 @@ func (s *patcher) ApplyPodAnnotations(ctx context.Context, name, namespace strin
}

patch := &unstructured.Unstructured{Object: obj}
if err := s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{
return s.client.Patch(ctx, patch, cli.Apply, &cli.PatchOptions{

Check warning on line 262 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L262

Added line #L262 was not covered by tests
FieldManager: s.fieldManager,
Force: ptr.To(true),
}); err != nil {
return err
}

return nil
})

Check warning on line 265 in internal/k8s/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/client/client.go#L265

Added line #L265 was not covered by tests
}
3 changes: 3 additions & 0 deletions internal/k8s/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Pod struct {
CPURequest float64
MemLimit float64
MemRequest float64
Labels map[string]string
Annotations map[string]string
}

Expand Down Expand Up @@ -110,6 +111,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re
pods = make(map[string][]Pod, len(ps.Items))
)

// skipcq: CRT-P0006

Check warning on line 114 in internal/k8s/pod/pod.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/pod.go#L114

Added line #L114 was not covered by tests
for _, pod := range ps.Items {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil ||
(r.namespace != "" && !strings.EqualFold(pod.GetNamespace(), r.namespace)) ||
Expand Down Expand Up @@ -151,6 +153,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re
CPURequest: cpuRequest,
MemLimit: memLimit,
MemRequest: memRequest,
Labels: pod.GetLabels(),

Check warning on line 156 in internal/k8s/pod/pod.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/pod.go#L156

Added line #L156 was not covered by tests
Annotations: pod.GetAnnotations(),
})
}
Expand Down
29 changes: 29 additions & 0 deletions internal/k8s/vald/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package vald

import "time"

const (
TimeFormat = time.RFC3339Nano
UncommittedAnnotationsKey = "vald.vdaas.org/uncommitted"
UnsavedProcessedVQAnnotationsKey = "vald.vdaas.org/unsaved-processed-vq"
UnsavedCreateIndexExecutionNumAnnotationsKey = "vald.vdaas.org/unsaved-create-index-execution"
LastTimeSaveIndexTimestampAnnotationsKey = "vald.vdaas.org/last-time-save-index-timestamp"
IndexCountAnnotationsKey = "vald.vdaas.org/index-count"
LastTimeSnapshotTimestampAnnotationsKey = "vald.vdaas.org/last-time-snapshot-timestamp"
)
7 changes: 6 additions & 1 deletion internal/test/mock/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ValdK8sClientMock struct {

var _ client.Client = (*ValdK8sClientMock)(nil)

func (m *ValdK8sClientMock) Get(ctx context.Context, name string, namespace string, obj client.Object, opts ...crclient.GetOption) error {
func (m *ValdK8sClientMock) Get(ctx context.Context, name, namespace string, obj client.Object, opts ...crclient.GetOption) error {

Check warning on line 33 in internal/test/mock/k8s/client.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/k8s/client.go#L33

Added line #L33 was not covered by tests
args := m.Called(ctx, name, namespace, obj, opts)
return args.Error(0)
}
Expand Down Expand Up @@ -65,6 +65,11 @@ func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList,
return args.Get(0).(watch.Interface), args.Error(1)
}

func (m *ValdK8sClientMock) MatchingLabels(labels map[string]string) client.MatchingLabels {
args := m.Called(labels)
return args.Get(0).(client.MatchingLabels)

Check warning on line 70 in internal/test/mock/k8s/client.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock/k8s/client.go#L68-L70

Added lines #L68 - L70 were not covered by tests
}

func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) {
args := m.Called(key, op, vals)
return args.Get(0).(labels.Selector), args.Error(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
Expand Down Expand Up @@ -2006,7 +2007,7 @@ func (n *ngt) unsavedNumberOfCreateIndexExecutionEntry() (k, v string) {
}

func (n *ngt) lastTimeSaveIndexTimestampEntry(timestamp time.Time) (k, v string) {
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(time.RFC3339)
return lastTimeSaveIndexTimestampAnnotationsKey, timestamp.UTC().Format(vald.TimeFormat)
}

func (n *ngt) indexCountEntry() (k, v string) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/core/ngt/service/ngt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
core "github.com/vdaas/vald/internal/core/algorithm/ngt"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/file"
kvald "github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/strings"
Expand Down Expand Up @@ -1300,7 +1301,7 @@ func TestExportIndexInfo(t *testing.T) {
unsavedProcessedVqAnnotationsKey: "2",
}
expectedAfterSave := map[string]string{
lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(time.RFC3339),
lastTimeSaveIndexTimestampAnnotationsKey: saveIndexTime.UTC().Format(kvald.TimeFormat),
unsavedCreateIndexExecutionNumAnnotationsKey: "0",
unsavedProcessedVqAnnotationsKey: "0",
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -160,7 +161,7 @@ func (s *subProcess) rotate(ctx context.Context) error {
return err
}

err = s.updateDeployment(ctx, newPvc.GetName(), deployment)
err = s.updateDeployment(ctx, newPvc.GetName(), deployment, newSnap.CreationTimestamp.Time)

Check warning on line 164 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L164

Added line #L164 was not covered by tests
if err != nil {
log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName())
if dperr := s.deletePVC(ctx, newPvc); dperr != nil {
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Depl
Kind: "Deployment",
Name: deployment.GetName(),
UID: deployment.GetUID(),
Controller: pointer.Bool(true),
Controller: ptr.To(true),

Check warning on line 215 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L215

Added line #L215 was not covered by tests
},
},
},
Expand Down Expand Up @@ -257,7 +258,7 @@ func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployme
Kind: "Deployment",
Name: deployment.GetName(),
UID: deployment.GetUID(),
Controller: pointer.Bool(true),
Controller: ptr.To(true),

Check warning on line 261 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L261

Added line #L261 was not covered by tests
},
},
},
Expand Down Expand Up @@ -295,11 +296,17 @@ func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, err
return &list.Items[0], nil
}

func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {
func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment, snapshotTime time.Time) error {

Check warning on line 299 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L299

Added line #L299 was not covered by tests
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{}
}
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().UTC().Format(time.RFC3339)
now := time.Now().UTC().Format(time.RFC3339)
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = now

if deployment.Annotations == nil {
deployment.Annotations = map[string]string{}
}
deployment.Annotations[vald.LastTimeSnapshotTimestampAnnotationsKey] = snapshotTime.UTC().Format(vald.TimeFormat)

Check warning on line 309 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L303-L309

Added lines #L303 - L309 were not covered by tests

for _, vol := range deployment.Spec.Template.Spec.Volumes {
if vol.Name == s.volumeName {
Expand Down
86 changes: 82 additions & 4 deletions pkg/index/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ package service

import (
"context"
"fmt"
"reflect"
"time"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/k8s"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/job"
"github.com/vdaas/vald/internal/k8s/pod"
"github.com/vdaas/vald/internal/k8s/vald"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
Expand All @@ -37,9 +41,12 @@ type Operator interface {
}

type operator struct {
ctrl k8s.Controller
eg errgroup.Group
namespace string
ctrl k8s.Controller
eg errgroup.Group
namespace string
client client.Client
readReplicaEnabled bool
readReplicaLabelKey string
}

// New returns Indexer object if no error occurs.
Expand Down Expand Up @@ -88,6 +95,13 @@ func New(agentName string, opts ...Option) (o Operator, err error) {
if err != nil {
return nil, err
}

client, err := client.New()
if err != nil {
return nil, err
}
operator.client = client

Check warning on line 104 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L99-L104

Added lines #L99 - L104 were not covered by tests
return operator, nil
}

Expand Down Expand Up @@ -127,15 +141,79 @@ func (o *operator) podOnReconcile(ctx context.Context, podList map[string][]pod.
for k, v := range podList {
for _, pod := range v {
log.Debug("key", k, "name:", pod.Name, "annotations:", pod.Annotations)

// rotate read replica if needed
if o.readReplicaEnabled {
if err := o.rotateIfNeeded(ctx, pod); err != nil {
log.Error(err)
}

Check warning on line 149 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L144-L149

Added lines #L144 - L149 were not covered by tests
}
}
}
}

// TODO: implement job reconcile logic to detect save job completion and to start rotation.
func (o *operator) jobOnReconcile(ctx context.Context, jobList map[string][]job.Job) {
func (*operator) jobOnReconcile(_ context.Context, jobList map[string][]job.Job) {

Check warning on line 156 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L156

Added line #L156 was not covered by tests
for k, v := range jobList {
// skipcq: CRT-P0006

Check warning on line 158 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L158

Added line #L158 was not covered by tests
for _, job := range v {
log.Debug("key", k, "name:", job.Name, "status:", job.Status)
}
}
}

// rotateIfNeeded starts rotation job when the condition meets.
// This function is work in progress.
func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error {
t, ok := pod.Annotations[vald.LastTimeSaveIndexTimestampAnnotationsKey]
if !ok {
log.Info("the agent pod has not saved index yet. skipping...")
return nil
}
lastSavedTime, err := time.Parse(vald.TimeFormat, t)
if err != nil {
return fmt.Errorf("parsing last time saved time: %w", err)
}

Check warning on line 176 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L167-L176

Added lines #L167 - L176 were not covered by tests

podIdx, ok := pod.Labels[client.PodIndexLabel]
if !ok {
log.Info("no index label found. the agent is not StatefulSet? skipping...")
return nil
}

Check warning on line 182 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L178-L182

Added lines #L178 - L182 were not covered by tests

var depList client.DeploymentList
selector, err := o.client.LabelSelector(o.readReplicaLabelKey, client.SelectionOpEquals, []string{podIdx})
if err != nil {
return fmt.Errorf("creating label selector: %w", err)
}
listOpts := client.ListOptions{
Namespace: o.namespace,
LabelSelector: selector,
}
if err := o.client.List(ctx, &depList, &listOpts); err != nil {
return err
}
if len(depList.Items) == 0 {
return errors.New("no readreplica deployment found")
}
dep := depList.Items[0]

annotations := dep.GetAnnotations()
t, ok = annotations[vald.LastTimeSnapshotTimestampAnnotationsKey]
if ok {
lastSnapshotTime, err := time.Parse(vald.TimeFormat, t)
if err != nil {
return fmt.Errorf("parsing last snapshot time: %w", err)
}

Check warning on line 207 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L184-L207

Added lines #L184 - L207 were not covered by tests

if lastSnapshotTime.After(lastSavedTime) {
log.Info("snapshot taken after the last save. skipping...")
return nil
}

Check warning on line 212 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L209-L212

Added lines #L209 - L212 were not covered by tests
}

log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx)
// TODO: check if the rotator job already exists or queued
// then create rotation job
return nil

Check warning on line 218 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L215-L218

Added lines #L215 - L218 were not covered by tests
}
Loading

0 comments on commit 47a4565

Please sign in to comment.