Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local provider support for compaction #682

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 64 additions & 36 deletions controllers/compaction/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, logger logr.Logger, etcd
logger.Info("Creating etcd compaction job", "namespace", etcd.Namespace, "name", etcd.GetCompactionJobName())
job, err = r.createCompactionJob(ctx, logger, etcd)
if err != nil {
metricJobsTotal.With(prometheus.Labels{druidmetrics.LabelSucceeded: druidmetrics.ValueSucceededFalse, druidmetrics.EtcdNamespace: etcd.Namespace}).Inc()
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("error during compaction job creation: %v", err)
Expand Down Expand Up @@ -298,15 +299,39 @@ func (r *Reconciler) createCompactionJob(ctx context.Context, logger logr.Logger
Image: *etcdBackupImage,
ImagePullPolicy: v1.PullIfNotPresent,
Args: getCompactionJobArgs(etcd, r.config.MetricsScrapeWaitDuration.String()),
VolumeMounts: getCompactionJobVolumeMounts(etcd, logger),
Env: getCompactionJobEnvVar(etcd, logger),
}},
Volumes: getCompactionJobVolumes(etcd, logger),
},
},
},
}

if vms, err := getCompactionJobVolumeMounts(etcd); err != nil {
return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v",
etcd.Namespace,
etcd.Name,
err)
} else {
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
job.Spec.Template.Spec.Containers[0].VolumeMounts = vms
}

if env, err := getCompactionJobEnvVar(etcd); err != nil {
return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v",
etcd.Namespace,
etcd.Name,
err)
} else {
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
job.Spec.Template.Spec.Containers[0].Env = env
}

if vm, err := getCompactionJobVolumes(ctx, r.Client, r.logger, etcd); err != nil {
return nil, fmt.Errorf("error creating compaction job in %v for %v : %v",
etcd.Namespace,
etcd.Name,
err)
} else {
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
job.Spec.Template.Spec.Volumes = vm
}

if etcd.Spec.Backup.CompactionResources != nil {
job.Spec.Template.Spec.Containers[0].Resources = *etcd.Spec.Backup.CompactionResources
}
Expand All @@ -331,40 +356,40 @@ func getLabels(etcd *druidv1alpha1.Etcd) map[string]string {
"networking.gardener.cloud/to-public-networks": "allowed",
}
}
func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.VolumeMount {
func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, error) {
vms := []v1.VolumeMount{
{
Name: "etcd-workspace-dir",
MountPath: "/var/etcd/data",
},
}

if etcd.Spec.Backup.Store == nil {
return vms
}

provider, err := utils.StorageProviderFromInfraProvider(etcd.Spec.Backup.Store.Provider)
if err != nil {
logger.Error(err, "Storage provider is not recognized. Compaction job will not mount any volume with provider specific credentials", "namespace", etcd.Namespace, "name", etcd.Name)
return vms
return vms, fmt.Errorf("storage provider is not recognized while fetching volume mounts")
}

if provider == utils.GCS {
switch provider {
case utils.Local:
vms = append(vms, v1.VolumeMount{
Name: "host-storage",
MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""),
})
abdasgupta marked this conversation as resolved.
Show resolved Hide resolved
case utils.GCS:
vms = append(vms, v1.VolumeMount{
Name: "etcd-backup",
MountPath: "/var/.gcp/",
})
} else if provider == utils.S3 || provider == utils.ABS || provider == utils.OSS || provider == utils.Swift || provider == utils.OCS {
case utils.S3, utils.ABS, utils.OSS, utils.Swift, utils.OCS:
vms = append(vms, v1.VolumeMount{
Name: "etcd-backup",
MountPath: "/var/etcd-backup/",
})
}

return vms
return vms, nil
}

func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.Volume {
func getCompactionJobVolumes(ctx context.Context, cl client.Client, logger logr.Logger, etcd *druidv1alpha1.Etcd) ([]v1.Volume, error) {
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
vs := []v1.Volume{
{
Name: "etcd-workspace-dir",
Expand All @@ -374,22 +399,31 @@ func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.
},
}

if etcd.Spec.Backup.Store == nil {
return vs
}

storeValues := etcd.Spec.Backup.Store
provider, err := utils.StorageProviderFromInfraProvider(storeValues.Provider)
if err != nil {
logger.Error(err, "Storage provider is not recognized. Compaction job will fail as no storage could be configured", "namespace", etcd.Namespace, "name", etcd.Name)
return vs
return vs, fmt.Errorf("could not recognize storage provider while fetching volumes")
}
switch provider {
case "Local":
hostPath, err := utils.GetHostMountPathFromSecretRef(ctx, cl, logger, storeValues, etcd.Namespace)
if err != nil {
return vs, fmt.Errorf("could not determine host mount path for local provider")
}

if provider == utils.GCS || provider == utils.S3 || provider == utils.OSS || provider == utils.ABS || provider == utils.Swift || provider == utils.OCS {
hpt := v1.HostPathDirectory
vs = append(vs, v1.Volume{
Name: "host-storage",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: hostPath + "/" + pointer.StringDeref(storeValues.Container, ""),
shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
Type: &hpt,
},
},
})
case utils.GCS, utils.S3, utils.OSS, utils.ABS, utils.Swift, utils.OCS:
if storeValues.SecretRef == nil {
logger.Info("No secretRef is configured for backup store. Compaction job will fail as no storage could be configured.",
"namespace", etcd.Namespace, "name", etcd.Name)
return vs
return vs, fmt.Errorf("could not configure secretRef for backup store %v", provider)
}

vs = append(vs, v1.Volume{
Expand All @@ -402,14 +436,11 @@ func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.
})
}

return vs
return vs, nil
}

func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.EnvVar {
func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd) ([]v1.EnvVar, error) {
var env []v1.EnvVar
if etcd.Spec.Backup.Store == nil {
return env
}

storeValues := etcd.Spec.Backup.Store

Expand All @@ -418,8 +449,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E

provider, err := utils.StorageProviderFromInfraProvider(etcd.Spec.Backup.Store.Provider)
if err != nil {
logger.Error(err, "Storage provider is not recognized. Compaction job will likely fail as there is no provider specific credentials.", "namespace", etcd.Namespace, "name", etcd.Name)
return env
return env, fmt.Errorf("storage provider is not recognized while fetching secrets from environment variable")
}

switch provider {
Expand All @@ -435,9 +465,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E
env = append(env, getEnvVarFromValues("ALICLOUD_APPLICATION_CREDENTIALS", "/var/etcd-backup"))
case utils.ECS:
if storeValues.SecretRef == nil {
logger.Info("No secretRef is configured for backup store. Compaction job will fail as no storage could be configured.",
"namespace", etcd.Namespace, "name", etcd.Name)
return env
return env, fmt.Errorf("no secretRef could be configured for backup store of ECS")
}

env = append(env, getEnvVarFromSecrets("ECS_ENDPOINT", storeValues.SecretRef.Name, "endpoint"))
Expand All @@ -447,7 +475,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E
env = append(env, getEnvVarFromValues("OPENSHIFT_APPLICATION_CREDENTIALS", "/var/etcd-backup"))
}

return env
return env, nil
}

func getEnvVarFromValues(name, value string) v1.EnvVar {
Expand Down
12 changes: 10 additions & 2 deletions test/integration/controllers/compaction/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = Describe("Compaction Controller", func() {

deleteEtcdSnapshotLeasesAndWait(k8sClient, instance)
},
Entry("if fields are set in etcd.Spec and TLS enabled, the resources should reflect the spec changes", "foo71", druidv1alpha1.StorageProvider("Local"), validateEtcdForCompactionJob),
Entry("if fields are set in etcd.Spec and TLS enabled, the resources should reflect the spec changes", "foo71", druidv1alpha1.StorageProvider("local"), validateEtcdForCompactionJob),
Entry("if the store is S3, the statefulset and compaction job should reflect the spec changes", "foo72", druidv1alpha1.StorageProvider("aws"), validateStoreAWSForCompactionJob),
Entry("if the store is ABS, the statefulset and compaction job should reflect the spec changes", "foo73", druidv1alpha1.StorageProvider("azure"), validateStoreAzureForCompactionJob),
Entry("if the store is GCS, the statefulset and compaction job should reflect the spec changes", "foo74", druidv1alpha1.StorageProvider("gcp"), validateStoreGCPForCompactionJob),
Expand All @@ -112,7 +112,7 @@ var _ = Describe("Compaction Controller", func() {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

instance = testutils.EtcdBuilderWithDefaults("foo77", namespace).Build()
instance = testutils.EtcdBuilderWithDefaults("foo77", namespace).WithProviderLocal().Build()
createEtcdAndWait(k8sClient, instance)

// manually create full and delta snapshot leases since etcd controller is not running
Expand Down Expand Up @@ -318,6 +318,14 @@ func validateEtcdForCompactionJob(instance *druidv1alpha1.Etcd, j *batchv1.Job)
})),
}),
}),
"host-storage": MatchFields(IgnoreExtras, Fields{
"Name": Equal("host-storage"),
"VolumeSource": MatchFields(IgnoreExtras, Fields{
"HostPath": PointTo(MatchFields(IgnoreExtras, Fields{
"Path": Equal("/etc/gardener/local-backupbuckets/default.bkp"),
})),
}),
}),
}),
}),
}),
Expand Down
21 changes: 21 additions & 0 deletions test/utils/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (eb *EtcdBuilder) WithStorageProvider(provider druidv1alpha1.StorageProvide
return eb.WithProviderGCS()
case "openstack":
return eb.WithProviderSwift()
case "local":
return eb.WithProviderLocal()
default:
return eb
}
Expand Down Expand Up @@ -226,6 +228,16 @@ func (eb *EtcdBuilder) WithProviderOSS() *EtcdBuilder {
return eb
}

func (eb *EtcdBuilder) WithProviderLocal() *EtcdBuilder {
if eb == nil || eb.etcd == nil {
return nil
}
eb.etcd.Spec.Backup.Store = getBackupStoreForLocal(
eb.etcd.Name,
)
return eb
}

func (eb *EtcdBuilder) Build() *druidv1alpha1.Etcd {
return eb.etcd
}
Expand Down Expand Up @@ -361,6 +373,15 @@ func getBackupStore(name string, provider druidv1alpha1.StorageProvider) *druidv
}
}

func getBackupStoreForLocal(name string) *druidv1alpha1.StoreSpec {
provider := druidv1alpha1.StorageProvider("local")
return &druidv1alpha1.StoreSpec{
Container: &container,
Prefix: name,
Provider: &provider,
}
}

shreyas-s-rao marked this conversation as resolved.
Show resolved Hide resolved
func CheckEtcdOwnerReference(refs []metav1.OwnerReference, etcd *druidv1alpha1.Etcd) bool {
for _, ownerRef := range refs {
if ownerRef.UID == etcd.UID {
Expand Down