diff --git a/charts/vald/templates/index/job/readreplica/rotate/configmap.yaml b/charts/vald/templates/index/job/readreplica/rotate/configmap.yaml index b189f86dfa6..d6dcbf7b549 100644 --- a/charts/vald/templates/index/job/readreplica/rotate/configmap.yaml +++ b/charts/vald/templates/index/job/readreplica/rotate/configmap.yaml @@ -48,6 +48,6 @@ data: rotator: agent_namespace: {{ $rotator.agent_namespace | quote }} read_replica_label_key: {{ $agent.readreplica.label_key | quote }} - read_replica_id: "_MY_TARGET_REPLICA_ID_" + read_replica_id: "_{{ $rotator.target_read_replica_id_envname}}_" volume_name: {{ $agent.readreplica.volume_name | quote }} {{- end }} diff --git a/charts/vald/templates/index/operator/configmap.yaml b/charts/vald/templates/index/operator/configmap.yaml index c4e0acd9cb2..f314820096d 100644 --- a/charts/vald/templates/index/operator/configmap.yaml +++ b/charts/vald/templates/index/operator/configmap.yaml @@ -15,6 +15,7 @@ # {{- $operator := .Values.manager.index.operator -}} {{- $agent := .Values.agent -}} +{{- $rotator := .Values.manager.index.readreplica.rotator -}} {{- if $operator.enabled }} apiVersion: v1 kind: ConfigMap @@ -45,6 +46,7 @@ data: namespace: _MY_POD_NAMESPACE_ agent_name: {{ $agent.name }} agent_namespace: {{ $agent.namespace }} + rotator_name: {{ $rotator.name }} concurrency: 1 read_replica_enabled: {{ $agent.readreplica.enabled }} read_replica_label_key: {{ $agent.readreplica.label_key }} diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 5cca7418fee..7775b9871c2 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -3475,9 +3475,9 @@ manager: # @schema {"name": "manager.index.readreplica.rotator.agent_namespace", "type": "string"} # manager.index.readreplica.rotator.agent_namespace -- namespace of agent pods to manage agent_namespace: _MY_POD_NAMESPACE_ - # @schema {"name": "manager.index.readreplica.rotator.read_replica_id", "type": "string"} - # manager.index.readreplica.rotator.read_replica_id -- read replica id to perform rotation - read_replica_id: _MY_TARGET_REPLICA_ID_ + # # @schema {"name": "manager.index.readreplica.rotator.target_read_replica_id_envname", "type": "string"} + # # manager.index.readreplica.rotator.target_read_replica_id_envname -- read replica id to perform rotation + target_read_replica_id_envname: MY_TARGET_REPLICA_ID # @schema {"name": "manager.index.readreplica.rotator.serviceAccount", "type": "object"} serviceAccount: # @schema {"name": "manager.index.readreplica.rotator.serviceAccount.enabled", "type": "boolean"} diff --git a/cmd/index/operator/sample.yaml b/cmd/index/operator/sample.yaml index a370f380904..3a952bd15db 100644 --- a/cmd/index/operator/sample.yaml +++ b/cmd/index/operator/sample.yaml @@ -72,6 +72,8 @@ operator: namespace: "default" agent_name: "vald-agent" agent_namespace: "default" + rotator_name: "vald-readreplica-rotate" + target_read_replica_id_envname: MY_TARGET_REPLICA_ID concurrency: 1 read_replica_enabled: true read_replica_label_key: "vald-readreplica-id" diff --git a/internal/config/index_operator.go b/internal/config/index_operator.go index 4939a38f71c..b0a0fe74e57 100644 --- a/internal/config/index_operator.go +++ b/internal/config/index_operator.go @@ -24,6 +24,12 @@ type IndexOperator struct { // AgentNamespace represent agent namespace location AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` + // RotatorName represent rotator name for service discovery + RotatorName string `json:"rotator_name" yaml:"rotator_name"` + + // TargetReadReplicaIDEnvname represents the environment variable name for target read replica id. + TargetReadReplicaIDEnvname string `json:"target_read_replica_id_envname" yaml:"target_read_replica_id_envname"` + // Concurrency represents indexing concurrency. Concurrency int `json:"concurrency" yaml:"concurrency"` diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 0798bca32ed..e9367c553dd 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -49,6 +49,8 @@ type ( ListOption = cli.ListOption CreateOption = cli.CreateOption CreateOptions = cli.CreateOptions + GetOption = cli.GetOption + GetOptions = cli.GetOptions UpdateOptions = cli.UpdateOptions MatchingLabels = cli.MatchingLabels InNamespace = cli.InNamespace diff --git a/internal/k8s/pod/option.go b/internal/k8s/pod/option.go index 3952d648f8d..b5b0139b4b6 100644 --- a/internal/k8s/pod/option.go +++ b/internal/k8s/pod/option.go @@ -20,6 +20,7 @@ package pod import ( "context" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -84,3 +85,10 @@ func WithFields(fs map[string]string) Option { return nil } } + +func WithForOpts(fopts ...builder.ForOption) Option { + return func(r *reconciler) error { + r.forOpts = fopts + return nil + } +} diff --git a/internal/k8s/pod/pod.go b/internal/k8s/pod/pod.go index 1229f1620ce..d413cdf61e8 100644 --- a/internal/k8s/pod/pod.go +++ b/internal/k8s/pod/pod.go @@ -42,6 +42,7 @@ type reconciler struct { onError func(err error) onReconcile func(ctx context.Context, podList map[string][]Pod) lopts []client.ListOption + forOpts []builder.ForOption } type Pod struct { @@ -185,8 +186,8 @@ func (r *reconciler) NewReconciler(ctx context.Context, mgr manager.Manager) rec return r } -func (*reconciler) For() (client.Object, []builder.ForOption) { - return new(corev1.Pod), nil +func (r *reconciler) For() (client.Object, []builder.ForOption) { + return new(corev1.Pod), r.forOpts } func (*reconciler) Owns() (client.Object, []builder.OwnsOption) { diff --git a/pkg/index/operator/service/operator.go b/pkg/index/operator/service/operator.go index a1a96b7fc61..a23e57996fe 100644 --- a/pkg/index/operator/service/operator.go +++ b/pkg/index/operator/service/operator.go @@ -28,6 +28,14 @@ import ( "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync/errgroup" + + //FIXME: + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) const ( @@ -40,18 +48,22 @@ type Operator interface { } type operator struct { - ctrl k8s.Controller - eg errgroup.Group - namespace string - client client.Client - readReplicaEnabled bool - readReplicaLabelKey string + ctrl k8s.Controller + eg errgroup.Group + namespace string + client client.Client + rotatorName string + targetReadReplicaIDEnvName string + readReplicaEnabled bool + readReplicaLabelKey string } // New returns Indexer object if no error occurs. -func New(namespace, agentName string, opts ...Option) (o Operator, err error) { +func New(namespace, agentName, rotatorName, targetReadReplicaIDEnvName string, opts ...Option) (o Operator, err error) { operator := new(operator) operator.namespace = namespace + operator.targetReadReplicaIDEnvName = targetReadReplicaIDEnvName + operator.rotatorName = rotatorName for _, opt := range append(defaultOpts, opts...) { if err := opt(operator); err != nil { oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) @@ -64,6 +76,10 @@ func New(namespace, agentName string, opts ...Option) (o Operator, err error) { } } + isAgent := func(pod *corev1.Pod) bool { + return pod.Labels["app"] == agentName + } + podController := pod.New( pod.WithControllerName("pod reconciler for index operator"), pod.WithOnErrorFunc(func(err error) { @@ -74,6 +90,39 @@ func New(namespace, agentName string, opts ...Option) (o Operator, err error) { pod.WithLabels(map[string]string{ "app": agentName, }), + // To only reconcile for agent pods + pod.WithForOpts( + builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + pod, ok := e.Object.(*corev1.Pod) + if !ok { + return false + } + return isAgent(pod) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + pod, ok := e.Object.(*corev1.Pod) + if !ok { + return false + } + return isAgent(pod) + }, + UpdateFunc: func(e event.UpdateEvent) bool { + pod, ok := e.ObjectNew.(*corev1.Pod) + if !ok { + return false + } + return isAgent(pod) + }, + GenericFunc: func(e event.GenericEvent) bool { + pod, ok := e.Object.(*corev1.Pod) + if !ok { + return false + } + return isAgent(pod) + }, + }), + ), ) operator.ctrl, err = k8s.New( @@ -124,7 +173,6 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { return ech, nil } -// TODO: implement agent pod reconcile logic to detect conditions to start indexing and saving. func (o *operator) podOnReconcile(ctx context.Context, podList map[string][]pod.Pod) { for k, v := range podList { for _, pod := range v { @@ -190,8 +238,71 @@ func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error { } } - log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx) - // TODO: check if the rotator job already exists or queued - // then create rotation job + log.Infof("rotation required for agent(id: %s)", podIdx) + if err := o.createRotationJob(ctx, podIdx); err != nil { + return fmt.Errorf("creating rotation job: %w", err) + } + return nil +} + +func (o *operator) createRotationJob(ctx context.Context, podIdx string) error { + var cronJob batchv1.CronJob + if err := o.client.Get(ctx, o.rotatorName, o.namespace, &cronJob); err != nil { + return err + } + + // get all the rotation jobs and make sure the job is not running + var jobList batchv1.JobList + selector, err := o.client.LabelSelector("app", client.SelectionOpEquals, []string{o.rotatorName}) + if err != nil { + return fmt.Errorf("creating label selector: %w", err) + } + if err := o.client.List(ctx, &jobList, &client.ListOptions{ + Namespace: o.namespace, + LabelSelector: selector, + }); err != nil { + return fmt.Errorf("listing jobs: %w", err) + } + for _, job := range jobList.Items { + // no need to check finished jobs + if job.Status.Active == 0 { + continue + } + + envs := job.Spec.Template.Spec.Containers[0].Env + // since latest append wins, checking backbards + for i := len(envs) - 1; i >= 0; i-- { + env := envs[i] + if env.Name == o.targetReadReplicaIDEnvName { + if env.Value == podIdx { + log.Infof("rotation job for the agent(id: %s) is already running. skipping...", podIdx) + return nil + } else { + break + } + } + } + } + + // now we actually needs to create the rotator job + log.Info("no job is running to rotate the agent(id:%s). creating a new job...", podIdx) + spec := *cronJob.Spec.JobTemplate.Spec.DeepCopy() + spec.Template.Spec.Containers[0].Env = append(spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: o.targetReadReplicaIDEnvName, + Value: podIdx, + }) + + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: cronJob.Name + "-", + Namespace: o.namespace, + }, + Spec: spec, + } + + if err := o.client.Create(ctx, &job); err != nil { + return err + } + return nil } diff --git a/pkg/index/operator/usecase/operator.go b/pkg/index/operator/usecase/operator.go index 56ffbe02de0..8eb9bd56a33 100644 --- a/pkg/index/operator/usecase/operator.go +++ b/pkg/index/operator/usecase/operator.go @@ -44,6 +44,8 @@ func New(cfg *config.Data) (_ runner.Runner, err error) { operator, err := service.New( cfg.Operator.Namespace, cfg.Operator.AgentName, + cfg.Operator.RotatorName, + cfg.Operator.TargetReadReplicaIDEnvname, service.WithReadReplicaEnabled(cfg.Operator.ReadReplicaEnabled), service.WithReadReplicaLabelKey(cfg.Operator.ReadReplicaLabelKey), )