Skip to content

Commit

Permalink
Add rotate-all option to rotator
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Jan 23, 2024
1 parent 2ca7187 commit 7b7c72f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 50 deletions.
1 change: 1 addition & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
DeletePropagationBackground = metav1.DeletePropagationBackground
WatchDeletedEvent = watch.Deleted
SelectionOpEquals = selection.Equals
SelectionOpExists = selection.Exists
)

type Client interface {
Expand Down
155 changes: 105 additions & 50 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -33,7 +34,8 @@ import (
)

const (
apiName = "vald/index/job/readreplica/rotate"
apiName = "vald/index/job/readreplica/rotate"
rotateAllId = "rotate-all"
)

// Rotator represents an interface for indexing.
Expand All @@ -45,9 +47,13 @@ type rotator struct {
namespace string
volumeName string
readReplicaLabelKey string
readReplicaID string
client client.Client
listOpts client.ListOptions
subProcesses []subProcess
}

type subProcess struct {
listOpts client.ListOptions
client client.Client
volumeName string
}

// New returns Indexer object if no error occurs.
Expand All @@ -57,7 +63,6 @@ func New(replicaID string, opts ...Option) (Rotator, error) {
if replicaID == "" {
return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var")
}
r.readReplicaID = replicaID

for _, opt := range append(defaultOpts, opts...) {
if err := opt(r); err != nil {
Expand All @@ -75,17 +80,44 @@ func New(replicaID string, opts ...Option) (Rotator, error) {
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
r.client = c

selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{r.readReplicaID})
if err != nil {
return nil, err
}
r.listOpts = client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
}
if replicaID == rotateAllId {
var deploymentList appsv1.DeploymentList
selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{})
if err != nil {
return nil, err
}
c.List(context.Background(), &deploymentList, &client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
})

deployments := deploymentList.Items
if len(deployments) == 0 {
return nil, fmt.Errorf("no read replica found to rotate")
}

Check warning on line 98 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L84-L98

Added lines #L84 - L98 were not covered by tests

var ids []string
for _, deployment := range deployments {
ids = append(ids, deployment.Labels[r.readReplicaLabelKey])
}

Check warning on line 103 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L100-L103

Added lines #L100 - L103 were not covered by tests

for _, id := range ids {
sub, err := r.newSubprocess(c, id)
if err != nil {
return nil, fmt.Errorf("failed to create rotator subprocess: %w", err)
}

Check warning on line 109 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L105-L109

Added lines #L105 - L109 were not covered by tests

r.subProcesses = append(r.subProcesses, sub)

Check warning on line 111 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L111

Added line #L111 was not covered by tests
}
} else {
sub, err := r.newSubprocess(c, replicaID)
if err != nil {
return nil, fmt.Errorf("failed to create rotator subprocess: %w", err)
}

Check warning on line 117 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L113-L117

Added lines #L113 - L117 were not covered by tests

r.subProcesses = append(r.subProcesses, sub)

Check warning on line 119 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L119

Added line #L119 was not covered by tests
}
return r, nil
}

Expand All @@ -98,68 +130,91 @@ func (r *rotator) Start(ctx context.Context) error {
}
}()

if err := r.rotate(ctx); err != nil {
if span != nil {
span.RecordError(err)
span.SetStatus(trace.StatusError, err.Error())
}
return err
eg, ectx := errgroup.New(ctx)
for _, sub := range r.subProcesses {
s := sub
eg.Go(safety.RecoverFunc(func() (err error) {
if err := s.rotate(ectx); err != nil {
if span != nil {
span.RecordError(err)
span.SetStatus(trace.StatusError, err.Error())
}
return err

Check warning on line 142 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L133-L142

Added lines #L133 - L142 were not covered by tests
}
return nil

Check warning on line 144 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L144

Added line #L144 was not covered by tests
}))
}

return nil
return eg.Wait()

Check warning on line 148 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L148

Added line #L148 was not covered by tests
}

func (r *rotator) newSubprocess(c client.Client, replicaID string) (subProcess, error) {
selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{replicaID})
if err != nil {
return subProcess{}, err
}
sub := subProcess{
client: c,
listOpts: client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
},
volumeName: r.volumeName,
}
return sub, nil

Check warning on line 164 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L151-L164

Added lines #L151 - L164 were not covered by tests
}

func (r *rotator) rotate(ctx context.Context) error {
func (s *subProcess) rotate(ctx context.Context) error {

Check warning on line 167 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L167

Added line #L167 was not covered by tests
// get deployment here to pass to create methods of snapshot and pvc
// and put it as owner reference of them so that they will be deleted when the deployment is deleted
deployment, err := r.getDeployment(ctx)
deployment, err := s.getDeployment(ctx)

Check warning on line 170 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L170

Added line #L170 was not covered by tests
if err != nil {
log.Errorf("failed to get Deployment.")
return err
}

newSnap, oldSnap, err := r.createSnapshot(ctx, deployment)
newSnap, oldSnap, err := s.createSnapshot(ctx, deployment)

Check warning on line 176 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L176

Added line #L176 was not covered by tests
if err != nil {
return err
}

newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName(), deployment)
newPvc, oldPvc, err := s.createPVC(ctx, newSnap.GetName(), deployment)

Check warning on line 181 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L181

Added line #L181 was not covered by tests
if err != nil {
log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName())
if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil {
if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil {

Check warning on line 184 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L184

Added line #L184 was not covered by tests
errors.Join(err, dserr)
}
return err
}

err = r.updateDeployment(ctx, newPvc.GetName(), deployment)
err = s.updateDeployment(ctx, newPvc.GetName(), deployment)

Check warning on line 190 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L190

Added line #L190 was not covered by tests
if err != nil {
log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName())
if dperr := r.deletePVC(ctx, newPvc); dperr != nil {
if dperr := s.deletePVC(ctx, newPvc); dperr != nil {

Check warning on line 193 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L193

Added line #L193 was not covered by tests
errors.Join(err, dperr)
}
if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil {
if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil {

Check warning on line 196 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L196

Added line #L196 was not covered by tests
errors.Join(err, dserr)
}
return err
}

err = r.deleteSnapshot(ctx, oldSnap)
err = s.deleteSnapshot(ctx, oldSnap)

Check warning on line 202 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L202

Added line #L202 was not covered by tests
if err != nil {
return err
}

err = r.deletePVC(ctx, oldPvc)
err = s.deletePVC(ctx, oldPvc)

Check warning on line 207 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L207

Added line #L207 was not covered by tests
if err != nil {
return err
}

return nil
}

func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) {
func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) {

Check warning on line 215 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L215

Added line #L215 was not covered by tests
list := snapshotv1.VolumeSnapshotList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 217 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L217

Added line #L217 was not covered by tests
return nil, nil, fmt.Errorf("failed to get snapshot: %w", err)
}
if len(list.Items) == 0 {
Expand Down Expand Up @@ -193,17 +248,17 @@ func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deploym
log.Infof("creating new snapshot(%s)...", newSnap.GetName())
log.Debugf("snapshot detail: %#v", newSnap)

err = r.client.Create(ctx, newSnap)
err = s.client.Create(ctx, newSnap)

Check warning on line 251 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L251

Added line #L251 was not covered by tests
if err != nil {
return nil, nil, fmt.Errorf("failed to create snapshot: %w", err)
}

return newSnap, oldSnap, nil
}

func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) {
func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) {

Check warning on line 259 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L259

Added line #L259 was not covered by tests
list := v1.PersistentVolumeClaimList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 261 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L261

Added line #L261 was not covered by tests
return nil, nil, fmt.Errorf("failed to get PVC: %w", err)
}
if len(list.Items) == 0 {
Expand Down Expand Up @@ -248,16 +303,16 @@ func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment
log.Infof("creating new pvc(%s)...", newPvc.GetName())
log.Debugf("pvc detail: %#v", newPvc)

if err := r.client.Create(ctx, newPvc); err != nil {
if err := s.client.Create(ctx, newPvc); err != nil {

Check warning on line 306 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L306

Added line #L306 was not covered by tests
return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err)
}

return newPvc, oldPvc, nil
}

func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error) {
func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, error) {

Check warning on line 313 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L313

Added line #L313 was not covered by tests
list := appsv1.DeploymentList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 315 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L315

Added line #L315 was not covered by tests
return nil, fmt.Errorf("failed to get deployment through client: %w", err)
}
if len(list.Items) == 0 {
Expand All @@ -267,34 +322,34 @@ func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error)
return &list.Items[0], nil
}

func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {
func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {

Check warning on line 325 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L325

Added line #L325 was not covered by tests
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{}
}
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)

for _, vol := range deployment.Spec.Template.Spec.Volumes {
if vol.Name == r.volumeName {
if vol.Name == s.volumeName {

Check warning on line 332 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L332

Added line #L332 was not covered by tests
vol.PersistentVolumeClaim.ClaimName = newPVC
}
}

log.Infof("updating deployment(%s)...", deployment.GetName())
log.Debugf("deployment detail: %#v", deployment)

if err := r.client.Update(ctx, deployment); err != nil {
if err := s.client.Update(ctx, deployment); err != nil {

Check warning on line 340 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L340

Added line #L340 was not covered by tests
return fmt.Errorf("failed to update deployment: %w", err)
}

return nil
}

func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error {
watcher, err := r.client.Watch(ctx,
func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error {
watcher, err := s.client.Watch(ctx,

Check warning on line 348 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L347-L348

Added lines #L347 - L348 were not covered by tests
&snapshotv1.VolumeSnapshotList{
Items: []snapshotv1.VolumeSnapshot{*snapshot},
},
&r.listOpts,
&s.listOpts,

Check warning on line 352 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L352

Added line #L352 was not covered by tests
)
if err != nil {
return fmt.Errorf("failed to watch snapshot(%s): %w", snapshot.GetName(), err)
Expand All @@ -320,18 +375,18 @@ func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Volum
}
})

if err := r.client.Delete(ctx, snapshot); err != nil {
if err := s.client.Delete(ctx, snapshot); err != nil {

Check warning on line 378 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L378

Added line #L378 was not covered by tests
return fmt.Errorf("failed to delete snapshot: %w", err)
}
return eg.Wait()
}

func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
watcher, err := r.client.Watch(ctx,
func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
watcher, err := s.client.Watch(ctx,

Check warning on line 385 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L384-L385

Added lines #L384 - L385 were not covered by tests
&v1.PersistentVolumeClaimList{
Items: []v1.PersistentVolumeClaim{*pvc},
},
&r.listOpts,
&s.listOpts,

Check warning on line 389 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L389

Added line #L389 was not covered by tests
)
if err != nil {
return fmt.Errorf("failed to watch PVC: %w", err)
Expand All @@ -357,7 +412,7 @@ func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim)
}
})

if err := r.client.Delete(ctx, pvc); err != nil {
if err := s.client.Delete(ctx, pvc); err != nil {

Check warning on line 415 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L415

Added line #L415 was not covered by tests
return fmt.Errorf("failed to delete PVC(%s): %w", pvc.GetName(), err)
}

Expand Down

0 comments on commit 7b7c72f

Please sign in to comment.