Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rotate-all option to rotator #2305

Merged
merged 17 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/k8s/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
DeletePropagationBackground = metav1.DeletePropagationBackground
WatchDeletedEvent = watch.Deleted
SelectionOpEquals = selection.Equals
SelectionOpExists = selection.Exists
)

type Client interface {
Expand Down
155 changes: 105 additions & 50 deletions pkg/index/job/readreplica/rotate/service/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/observability/trace"
"github.com/vdaas/vald/internal/safety"
"github.com/vdaas/vald/internal/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand All @@ -33,7 +34,8 @@
)

const (
apiName = "vald/index/job/readreplica/rotate"
apiName = "vald/index/job/readreplica/rotate"
rotateAllId = "rotate-all"
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
)

// Rotator represents an interface for indexing.
Expand All @@ -45,9 +47,13 @@
namespace string
volumeName string
readReplicaLabelKey string
readReplicaID string
client client.Client
listOpts client.ListOptions
subProcesses []subProcess
}

type subProcess struct {
listOpts client.ListOptions
client client.Client
volumeName string
}

// New returns Indexer object if no error occurs.
Expand All @@ -57,7 +63,6 @@
if replicaID == "" {
return nil, fmt.Errorf("readreplica id is empty. it should be set via MY_TARGET_REPLICA_ID env var")
}
r.readReplicaID = replicaID

for _, opt := range append(defaultOpts, opts...) {
if err := opt(r); err != nil {
Expand All @@ -75,17 +80,44 @@
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
r.client = c

selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{r.readReplicaID})
if err != nil {
return nil, err
}
r.listOpts = client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
}
if replicaID == rotateAllId {
var deploymentList appsv1.DeploymentList
selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{})
if err != nil {
return nil, err
}
c.List(context.Background(), &deploymentList, &client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
})

deployments := deploymentList.Items
if len(deployments) == 0 {
return nil, fmt.Errorf("no read replica found to rotate")
}

Check warning on line 98 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L84-L98

Added lines #L84 - L98 were not covered by tests

var ids []string
for _, deployment := range deployments {
ids = append(ids, deployment.Labels[r.readReplicaLabelKey])
}

Check warning on line 103 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L100-L103

Added lines #L100 - L103 were not covered by tests

for _, id := range ids {
sub, err := r.newSubprocess(c, id)
if err != nil {
return nil, fmt.Errorf("failed to create rotator subprocess: %w", err)
}

Check warning on line 109 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L105-L109

Added lines #L105 - L109 were not covered by tests

r.subProcesses = append(r.subProcesses, sub)

Check warning on line 111 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L111

Added line #L111 was not covered by tests
}
} else {
sub, err := r.newSubprocess(c, replicaID)
if err != nil {
return nil, fmt.Errorf("failed to create rotator subprocess: %w", err)
}

Check warning on line 117 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L113-L117

Added lines #L113 - L117 were not covered by tests

r.subProcesses = append(r.subProcesses, sub)

Check warning on line 119 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L119

Added line #L119 was not covered by tests
}
return r, nil
}

Expand All @@ -98,68 +130,91 @@
}
}()

if err := r.rotate(ctx); err != nil {
if span != nil {
span.RecordError(err)
span.SetStatus(trace.StatusError, err.Error())
}
return err
eg, ectx := errgroup.New(ctx)
for _, sub := range r.subProcesses {
s := sub
eg.Go(safety.RecoverFunc(func() (err error) {
if err := s.rotate(ectx); err != nil {
if span != nil {
span.RecordError(err)
span.SetStatus(trace.StatusError, err.Error())
}
return err

Check warning on line 142 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L133-L142

Added lines #L133 - L142 were not covered by tests
}
return nil

Check warning on line 144 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L144

Added line #L144 was not covered by tests
}))
}

return nil
return eg.Wait()

Check warning on line 148 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L148

Added line #L148 was not covered by tests
}

func (r *rotator) newSubprocess(c client.Client, replicaID string) (subProcess, error) {
selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{replicaID})
if err != nil {
return subProcess{}, err
}
sub := subProcess{
client: c,
listOpts: client.ListOptions{
Namespace: r.namespace,
LabelSelector: selector,
},
volumeName: r.volumeName,
}
return sub, nil

Check warning on line 164 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L151-L164

Added lines #L151 - L164 were not covered by tests
}

func (r *rotator) rotate(ctx context.Context) error {
func (s *subProcess) rotate(ctx context.Context) error {

Check warning on line 167 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L167

Added line #L167 was not covered by tests
// get deployment here to pass to create methods of snapshot and pvc
// and put it as owner reference of them so that they will be deleted when the deployment is deleted
deployment, err := r.getDeployment(ctx)
deployment, err := s.getDeployment(ctx)

Check warning on line 170 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L170

Added line #L170 was not covered by tests
if err != nil {
log.Errorf("failed to get Deployment.")
return err
}

newSnap, oldSnap, err := r.createSnapshot(ctx, deployment)
newSnap, oldSnap, err := s.createSnapshot(ctx, deployment)

Check warning on line 176 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L176

Added line #L176 was not covered by tests
if err != nil {
return err
}

newPvc, oldPvc, err := r.createPVC(ctx, newSnap.GetName(), deployment)
newPvc, oldPvc, err := s.createPVC(ctx, newSnap.GetName(), deployment)

Check warning on line 181 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L181

Added line #L181 was not covered by tests
if err != nil {
log.Errorf("failed to create PVC. removing the new snapshot(%s)...", newSnap.GetName())
if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil {
if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil {

Check warning on line 184 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L184

Added line #L184 was not covered by tests
errors.Join(err, dserr)
}
return err
}

err = r.updateDeployment(ctx, newPvc.GetName(), deployment)
err = s.updateDeployment(ctx, newPvc.GetName(), deployment)

Check warning on line 190 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L190

Added line #L190 was not covered by tests
if err != nil {
log.Errorf("failed to update Deployment. removing the new snapshot(%s) and pvc(%s)...", newSnap.GetName(), newPvc.GetName())
if dperr := r.deletePVC(ctx, newPvc); dperr != nil {
if dperr := s.deletePVC(ctx, newPvc); dperr != nil {

Check warning on line 193 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L193

Added line #L193 was not covered by tests
errors.Join(err, dperr)
}
if dserr := r.deleteSnapshot(ctx, newSnap); dserr != nil {
if dserr := s.deleteSnapshot(ctx, newSnap); dserr != nil {

Check warning on line 196 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L196

Added line #L196 was not covered by tests
errors.Join(err, dserr)
}
return err
}

err = r.deleteSnapshot(ctx, oldSnap)
err = s.deleteSnapshot(ctx, oldSnap)

Check warning on line 202 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L202

Added line #L202 was not covered by tests
if err != nil {
return err
}

err = r.deletePVC(ctx, oldPvc)
err = s.deletePVC(ctx, oldPvc)

Check warning on line 207 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L207

Added line #L207 was not covered by tests
if err != nil {
return err
}

return nil
}

func (r *rotator) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) {
func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) {

Check warning on line 215 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L215

Added line #L215 was not covered by tests
list := snapshotv1.VolumeSnapshotList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 217 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L217

Added line #L217 was not covered by tests
return nil, nil, fmt.Errorf("failed to get snapshot: %w", err)
}
if len(list.Items) == 0 {
Expand Down Expand Up @@ -193,17 +248,17 @@
log.Infof("creating new snapshot(%s)...", newSnap.GetName())
log.Debugf("snapshot detail: %#v", newSnap)

err = r.client.Create(ctx, newSnap)
err = s.client.Create(ctx, newSnap)

Check warning on line 251 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L251

Added line #L251 was not covered by tests
if err != nil {
return nil, nil, fmt.Errorf("failed to create snapshot: %w", err)
}

return newSnap, oldSnap, nil
}

func (r *rotator) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) {
func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) {

Check warning on line 259 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L259

Added line #L259 was not covered by tests
list := v1.PersistentVolumeClaimList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 261 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L261

Added line #L261 was not covered by tests
return nil, nil, fmt.Errorf("failed to get PVC: %w", err)
}
if len(list.Items) == 0 {
Expand Down Expand Up @@ -248,16 +303,16 @@
log.Infof("creating new pvc(%s)...", newPvc.GetName())
log.Debugf("pvc detail: %#v", newPvc)

if err := r.client.Create(ctx, newPvc); err != nil {
if err := s.client.Create(ctx, newPvc); err != nil {

Check warning on line 306 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L306

Added line #L306 was not covered by tests
return nil, nil, fmt.Errorf("failed to create PVC(%s): %w", newPvc.GetName(), err)
}

return newPvc, oldPvc, nil
}

func (r *rotator) getDeployment(ctx context.Context) (*appsv1.Deployment, error) {
func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, error) {

Check warning on line 313 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L313

Added line #L313 was not covered by tests
list := appsv1.DeploymentList{}
if err := r.client.List(ctx, &list, &r.listOpts); err != nil {
if err := s.client.List(ctx, &list, &s.listOpts); err != nil {

Check warning on line 315 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L315

Added line #L315 was not covered by tests
return nil, fmt.Errorf("failed to get deployment through client: %w", err)
}
if len(list.Items) == 0 {
Expand All @@ -267,34 +322,34 @@
return &list.Items[0], nil
}

func (r *rotator) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {
func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment) error {

Check warning on line 325 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L325

Added line #L325 was not covered by tests
if deployment.Spec.Template.ObjectMeta.Annotations == nil {
deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{}
}
deployment.Spec.Template.ObjectMeta.Annotations["kubectl.kubernetes.io/restartedAt"] = time.Now().Format(time.RFC3339)

for _, vol := range deployment.Spec.Template.Spec.Volumes {
if vol.Name == r.volumeName {
if vol.Name == s.volumeName {

Check warning on line 332 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L332

Added line #L332 was not covered by tests
vol.PersistentVolumeClaim.ClaimName = newPVC
}
}

log.Infof("updating deployment(%s)...", deployment.GetName())
log.Debugf("deployment detail: %#v", deployment)

if err := r.client.Update(ctx, deployment); err != nil {
if err := s.client.Update(ctx, deployment); err != nil {

Check warning on line 340 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L340

Added line #L340 was not covered by tests
return fmt.Errorf("failed to update deployment: %w", err)
}

return nil
}

func (r *rotator) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error {
watcher, err := r.client.Watch(ctx,
func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error {
watcher, err := s.client.Watch(ctx,

Check warning on line 348 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L347-L348

Added lines #L347 - L348 were not covered by tests
&snapshotv1.VolumeSnapshotList{
Items: []snapshotv1.VolumeSnapshot{*snapshot},
},
&r.listOpts,
&s.listOpts,

Check warning on line 352 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L352

Added line #L352 was not covered by tests
)
if err != nil {
return fmt.Errorf("failed to watch snapshot(%s): %w", snapshot.GetName(), err)
Expand All @@ -320,18 +375,18 @@
}
})

if err := r.client.Delete(ctx, snapshot); err != nil {
if err := s.client.Delete(ctx, snapshot); err != nil {

Check warning on line 378 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L378

Added line #L378 was not covered by tests
return fmt.Errorf("failed to delete snapshot: %w", err)
}
return eg.Wait()
}

func (r *rotator) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
watcher, err := r.client.Watch(ctx,
func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
watcher, err := s.client.Watch(ctx,

Check warning on line 385 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L384-L385

Added lines #L384 - L385 were not covered by tests
&v1.PersistentVolumeClaimList{
Items: []v1.PersistentVolumeClaim{*pvc},
},
&r.listOpts,
&s.listOpts,

Check warning on line 389 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L389

Added line #L389 was not covered by tests
)
if err != nil {
return fmt.Errorf("failed to watch PVC: %w", err)
Expand All @@ -357,7 +412,7 @@
}
})

if err := r.client.Delete(ctx, pvc); err != nil {
if err := s.client.Delete(ctx, pvc); err != nil {

Check warning on line 415 in pkg/index/job/readreplica/rotate/service/rotator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/readreplica/rotate/service/rotator.go#L415

Added line #L415 was not covered by tests
return fmt.Errorf("failed to delete PVC(%s): %w", pvc.GetName(), err)
}

Expand Down
Loading