Skip to content

Commit

Permalink
Add operator logic for rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Mar 11, 2024
1 parent c3c9153 commit 844cca1
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ data:
rotator:
agent_namespace: {{ $rotator.agent_namespace | quote }}
read_replica_label_key: {{ $agent.readreplica.label_key | quote }}
read_replica_id: "_MY_TARGET_REPLICA_ID_"
read_replica_id: "_{{ $rotator.target_read_replica_id_envname}}_"
volume_name: {{ $agent.readreplica.volume_name | quote }}
{{- end }}
2 changes: 2 additions & 0 deletions charts/vald/templates/index/operator/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
{{- $operator := .Values.manager.index.operator -}}
{{- $agent := .Values.agent -}}
{{- $rotator := .Values.manager.index.readreplica.rotator -}}
{{- if $operator.enabled }}
apiVersion: v1
kind: ConfigMap
Expand Down Expand Up @@ -45,6 +46,7 @@ data:
namespace: _MY_POD_NAMESPACE_
agent_name: {{ $agent.name }}
agent_namespace: {{ $agent.namespace }}
rotator_name: {{ $rotator.name }}
concurrency: 1
read_replica_enabled: {{ $agent.readreplica.enabled }}
read_replica_label_key: {{ $agent.readreplica.label_key }}
Expand Down
6 changes: 3 additions & 3 deletions charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3475,9 +3475,9 @@ manager:
# @schema {"name": "manager.index.readreplica.rotator.agent_namespace", "type": "string"}
# manager.index.readreplica.rotator.agent_namespace -- namespace of agent pods to manage
agent_namespace: _MY_POD_NAMESPACE_
# @schema {"name": "manager.index.readreplica.rotator.read_replica_id", "type": "string"}
# manager.index.readreplica.rotator.read_replica_id -- read replica id to perform rotation
read_replica_id: _MY_TARGET_REPLICA_ID_
# # @schema {"name": "manager.index.readreplica.rotator.target_read_replica_id_envname", "type": "string"}
# # manager.index.readreplica.rotator.target_read_replica_id_envname -- read replica id to perform rotation
target_read_replica_id_envname: MY_TARGET_REPLICA_ID
# @schema {"name": "manager.index.readreplica.rotator.serviceAccount", "type": "object"}
serviceAccount:
# @schema {"name": "manager.index.readreplica.rotator.serviceAccount.enabled", "type": "boolean"}
Expand Down
2 changes: 2 additions & 0 deletions cmd/index/operator/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ operator:
namespace: "default"
agent_name: "vald-agent"
agent_namespace: "default"
rotator_name: "vald-readreplica-rotate"
target_read_replica_id_envname: MY_TARGET_REPLICA_ID
concurrency: 1
read_replica_enabled: true
read_replica_label_key: "vald-readreplica-id"
Expand Down
6 changes: 6 additions & 0 deletions internal/config/index_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ type IndexOperator struct {
// AgentNamespace represent agent namespace location
AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"`

// RotatorName represent rotator name for service discovery
RotatorName string `json:"rotator_name" yaml:"rotator_name"`

// TargetReadReplicaIDEnvname represents the environment variable name for target read replica id.
TargetReadReplicaIDEnvname string `json:"target_read_replica_id_envname" yaml:"target_read_replica_id_envname"`

// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

Expand Down
2 changes: 2 additions & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
ListOption = cli.ListOption
CreateOption = cli.CreateOption
CreateOptions = cli.CreateOptions
GetOption = cli.GetOption
GetOptions = cli.GetOptions
UpdateOptions = cli.UpdateOptions
MatchingLabels = cli.MatchingLabels
InNamespace = cli.InNamespace
Expand Down
8 changes: 8 additions & 0 deletions internal/k8s/pod/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pod
import (
"context"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
Expand Down Expand Up @@ -84,3 +85,10 @@ func WithFields(fs map[string]string) Option {
return nil
}
}

func WithForOpts(fopts ...builder.ForOption) Option {
return func(r *reconciler) error {
r.forOpts = fopts
return nil
}

Check warning on line 93 in internal/k8s/pod/option.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/option.go#L89-L93

Added lines #L89 - L93 were not covered by tests
}
5 changes: 3 additions & 2 deletions internal/k8s/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type reconciler struct {
onError func(err error)
onReconcile func(ctx context.Context, podList map[string][]Pod)
lopts []client.ListOption
forOpts []builder.ForOption
}

type Pod struct {
Expand Down Expand Up @@ -185,8 +186,8 @@ func (r *reconciler) NewReconciler(ctx context.Context, mgr manager.Manager) rec
return r
}

func (*reconciler) For() (client.Object, []builder.ForOption) {
return new(corev1.Pod), nil
func (r *reconciler) For() (client.Object, []builder.ForOption) {
return new(corev1.Pod), r.forOpts

Check warning on line 190 in internal/k8s/pod/pod.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/pod/pod.go#L189-L190

Added lines #L189 - L190 were not covered by tests
}

func (*reconciler) Owns() (client.Object, []builder.OwnsOption) {
Expand Down
133 changes: 122 additions & 11 deletions pkg/index/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ import (
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"

//FIXME:
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

const (
Expand All @@ -40,18 +48,22 @@ type Operator interface {
}

type operator struct {
ctrl k8s.Controller
eg errgroup.Group
namespace string
client client.Client
readReplicaEnabled bool
readReplicaLabelKey string
ctrl k8s.Controller
eg errgroup.Group
namespace string
client client.Client
rotatorName string
targetReadReplicaIDEnvName string
readReplicaEnabled bool
readReplicaLabelKey string
}

// New returns Indexer object if no error occurs.
func New(namespace, agentName string, opts ...Option) (o Operator, err error) {
func New(namespace, agentName, rotatorName, targetReadReplicaIDEnvName string, opts ...Option) (o Operator, err error) {

Check warning on line 62 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L62

Added line #L62 was not covered by tests
operator := new(operator)
operator.namespace = namespace
operator.targetReadReplicaIDEnvName = targetReadReplicaIDEnvName
operator.rotatorName = rotatorName

Check warning on line 66 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L64-L66

Added lines #L64 - L66 were not covered by tests
for _, opt := range append(defaultOpts, opts...) {
if err := opt(operator); err != nil {
oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt))
Expand All @@ -64,6 +76,10 @@ func New(namespace, agentName string, opts ...Option) (o Operator, err error) {
}
}

isAgent := func(pod *corev1.Pod) bool {
return pod.Labels["app"] == agentName
}

Check warning on line 81 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L79-L81

Added lines #L79 - L81 were not covered by tests

podController := pod.New(
pod.WithControllerName("pod reconciler for index operator"),
pod.WithOnErrorFunc(func(err error) {
Expand All @@ -74,6 +90,39 @@ func New(namespace, agentName string, opts ...Option) (o Operator, err error) {
pod.WithLabels(map[string]string{
"app": agentName,
}),
// To only reconcile for agent pods
pod.WithForOpts(
builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
return isAgent(pod)

Check warning on line 101 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L96-L101

Added lines #L96 - L101 were not covered by tests
},
DeleteFunc: func(e event.DeleteEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
return isAgent(pod)

Check warning on line 108 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L103-L108

Added lines #L103 - L108 were not covered by tests
},
UpdateFunc: func(e event.UpdateEvent) bool {
pod, ok := e.ObjectNew.(*corev1.Pod)
if !ok {
return false
}
return isAgent(pod)

Check warning on line 115 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L110-L115

Added lines #L110 - L115 were not covered by tests
},
GenericFunc: func(e event.GenericEvent) bool {
pod, ok := e.Object.(*corev1.Pod)
if !ok {
return false
}
return isAgent(pod)

Check warning on line 122 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L117-L122

Added lines #L117 - L122 were not covered by tests
},
}),
),
)

operator.ctrl, err = k8s.New(
Expand Down Expand Up @@ -124,7 +173,6 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) {
return ech, nil
}

// TODO: implement agent pod reconcile logic to detect conditions to start indexing and saving.
func (o *operator) podOnReconcile(ctx context.Context, podList map[string][]pod.Pod) {
for k, v := range podList {
for _, pod := range v {
Expand Down Expand Up @@ -190,8 +238,71 @@ func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error {
}
}

log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx)
// TODO: check if the rotator job already exists or queued
// then create rotation job
log.Infof("rotation required for agent(id: %s)", podIdx)
if err := o.createRotationJob(ctx, podIdx); err != nil {
return fmt.Errorf("creating rotation job: %w", err)
}
return nil

Check warning on line 245 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L241-L245

Added lines #L241 - L245 were not covered by tests
}

func (o *operator) createRotationJob(ctx context.Context, podIdx string) error {
var cronJob batchv1.CronJob
if err := o.client.Get(ctx, o.rotatorName, o.namespace, &cronJob); err != nil {
return err
}

Check warning on line 252 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L248-L252

Added lines #L248 - L252 were not covered by tests

// get all the rotation jobs and make sure the job is not running
var jobList batchv1.JobList
selector, err := o.client.LabelSelector("app", client.SelectionOpEquals, []string{o.rotatorName})
if err != nil {
return fmt.Errorf("creating label selector: %w", err)
}
if err := o.client.List(ctx, &jobList, &client.ListOptions{
Namespace: o.namespace,
LabelSelector: selector,
}); err != nil {
return fmt.Errorf("listing jobs: %w", err)
}
for _, job := range jobList.Items {
// no need to check finished jobs
if job.Status.Active == 0 {
continue

Check warning on line 269 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L255-L269

Added lines #L255 - L269 were not covered by tests
}

envs := job.Spec.Template.Spec.Containers[0].Env
// since latest append wins, checking backbards
for i := len(envs) - 1; i >= 0; i-- {
env := envs[i]
if env.Name == o.targetReadReplicaIDEnvName {
if env.Value == podIdx {
log.Infof("rotation job for the agent(id: %s) is already running. skipping...", podIdx)
return nil
} else {
break

Check warning on line 281 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L272-L281

Added lines #L272 - L281 were not covered by tests
}
}
}
}

// now we actually needs to create the rotator job
log.Info("no job is running to rotate the agent(id:%s). creating a new job...", podIdx)
spec := *cronJob.Spec.JobTemplate.Spec.DeepCopy()
spec.Template.Spec.Containers[0].Env = append(spec.Template.Spec.Containers[0].Env, corev1.EnvVar{
Name: o.targetReadReplicaIDEnvName,
Value: podIdx,
})

job := batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: cronJob.Name + "-",
Namespace: o.namespace,
},
Spec: spec,
}

if err := o.client.Create(ctx, &job); err != nil {
return err
}

Check warning on line 305 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L288-L305

Added lines #L288 - L305 were not covered by tests

return nil
}
2 changes: 2 additions & 0 deletions pkg/index/operator/usecase/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func New(cfg *config.Data) (_ runner.Runner, err error) {
operator, err := service.New(
cfg.Operator.Namespace,

Check warning on line 45 in pkg/index/operator/usecase/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/usecase/operator.go#L45

Added line #L45 was not covered by tests
cfg.Operator.AgentName,
cfg.Operator.RotatorName,
cfg.Operator.TargetReadReplicaIDEnvname,

Check warning on line 48 in pkg/index/operator/usecase/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/usecase/operator.go#L47-L48

Added lines #L47 - L48 were not covered by tests
service.WithReadReplicaEnabled(cfg.Operator.ReadReplicaEnabled),
service.WithReadReplicaLabelKey(cfg.Operator.ReadReplicaLabelKey),
)
Expand Down

0 comments on commit 844cca1

Please sign in to comment.