diff --git a/controllers/compaction/reconciler.go b/controllers/compaction/reconciler.go index c1244506a..5a57aa044 100644 --- a/controllers/compaction/reconciler.go +++ b/controllers/compaction/reconciler.go @@ -180,6 +180,7 @@ func (r *Reconciler) reconcileJob(ctx context.Context, logger logr.Logger, etcd logger.Info("Creating etcd compaction job", "namespace", etcd.Namespace, "name", etcd.GetCompactionJobName()) job, err = r.createCompactionJob(ctx, logger, etcd) if err != nil { + metricJobsTotal.With(prometheus.Labels{druidmetrics.LabelSucceeded: druidmetrics.ValueSucceededFalse, druidmetrics.EtcdNamespace: etcd.Namespace}).Inc() return ctrl.Result{ RequeueAfter: 10 * time.Second, }, fmt.Errorf("error during compaction job creation: %v", err) @@ -298,15 +299,39 @@ func (r *Reconciler) createCompactionJob(ctx context.Context, logger logr.Logger Image: *etcdBackupImage, ImagePullPolicy: v1.PullIfNotPresent, Args: getCompactionJobArgs(etcd, r.config.MetricsScrapeWaitDuration.String()), - VolumeMounts: getCompactionJobVolumeMounts(etcd, logger), - Env: getCompactionJobEnvVar(etcd, logger), }}, - Volumes: getCompactionJobVolumes(etcd, logger), }, }, }, } + if vms, err := getCompactionJobVolumeMounts(etcd); err != nil { + return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v", + etcd.Namespace, + etcd.Name, + err) + } else { + job.Spec.Template.Spec.Containers[0].VolumeMounts = vms + } + + if env, err := getCompactionJobEnvVar(etcd); err != nil { + return nil, fmt.Errorf("error while creating compaction job in %v for %v : %v", + etcd.Namespace, + etcd.Name, + err) + } else { + job.Spec.Template.Spec.Containers[0].Env = env + } + + if vm, err := getCompactionJobVolumes(ctx, r.Client, r.logger, etcd); err != nil { + return nil, fmt.Errorf("error creating compaction job in %v for %v : %v", + etcd.Namespace, + etcd.Name, + err) + } else { + job.Spec.Template.Spec.Volumes = vm + } + if etcd.Spec.Backup.CompactionResources != nil { job.Spec.Template.Spec.Containers[0].Resources = *etcd.Spec.Backup.CompactionResources } @@ -331,7 +356,7 @@ func getLabels(etcd *druidv1alpha1.Etcd) map[string]string { "networking.gardener.cloud/to-public-networks": "allowed", } } -func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.VolumeMount { +func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd) ([]v1.VolumeMount, error) { vms := []v1.VolumeMount{ { Name: "etcd-workspace-dir", @@ -339,32 +364,32 @@ func getCompactionJobVolumeMounts(etcd *druidv1alpha1.Etcd, logger logr.Logger) }, } - if etcd.Spec.Backup.Store == nil { - return vms - } - provider, err := utils.StorageProviderFromInfraProvider(etcd.Spec.Backup.Store.Provider) if err != nil { - logger.Error(err, "Storage provider is not recognized. Compaction job will not mount any volume with provider specific credentials", "namespace", etcd.Namespace, "name", etcd.Name) - return vms + return vms, fmt.Errorf("storage provider is not recognized while fetching volume mounts") } - - if provider == utils.GCS { + switch provider { + case utils.Local: + vms = append(vms, v1.VolumeMount{ + Name: "host-storage", + MountPath: pointer.StringDeref(etcd.Spec.Backup.Store.Container, ""), + }) + case utils.GCS: vms = append(vms, v1.VolumeMount{ Name: "etcd-backup", MountPath: "/var/.gcp/", }) - } else if provider == utils.S3 || provider == utils.ABS || provider == utils.OSS || provider == utils.Swift || provider == utils.OCS { + case utils.S3, utils.ABS, utils.OSS, utils.Swift, utils.OCS: vms = append(vms, v1.VolumeMount{ Name: "etcd-backup", MountPath: "/var/etcd-backup/", }) } - return vms + return vms, nil } -func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.Volume { +func getCompactionJobVolumes(ctx context.Context, cl client.Client, logger logr.Logger, etcd *druidv1alpha1.Etcd) ([]v1.Volume, error) { vs := []v1.Volume{ { Name: "etcd-workspace-dir", @@ -374,22 +399,31 @@ func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1. }, } - if etcd.Spec.Backup.Store == nil { - return vs - } - storeValues := etcd.Spec.Backup.Store provider, err := utils.StorageProviderFromInfraProvider(storeValues.Provider) if err != nil { - logger.Error(err, "Storage provider is not recognized. Compaction job will fail as no storage could be configured", "namespace", etcd.Namespace, "name", etcd.Name) - return vs + return vs, fmt.Errorf("could not recognize storage provider while fetching volumes") } + switch provider { + case "Local": + hostPath, err := utils.GetHostMountPathFromSecretRef(ctx, cl, logger, storeValues, etcd.Namespace) + if err != nil { + return vs, fmt.Errorf("could not determine host mount path for local provider") + } - if provider == utils.GCS || provider == utils.S3 || provider == utils.OSS || provider == utils.ABS || provider == utils.Swift || provider == utils.OCS { + hpt := v1.HostPathDirectory + vs = append(vs, v1.Volume{ + Name: "host-storage", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: hostPath + "/" + pointer.StringDeref(storeValues.Container, ""), + Type: &hpt, + }, + }, + }) + case utils.GCS, utils.S3, utils.OSS, utils.ABS, utils.Swift, utils.OCS: if storeValues.SecretRef == nil { - logger.Info("No secretRef is configured for backup store. Compaction job will fail as no storage could be configured.", - "namespace", etcd.Namespace, "name", etcd.Name) - return vs + return vs, fmt.Errorf("could not configure secretRef for backup store %v", provider) } vs = append(vs, v1.Volume{ @@ -402,14 +436,11 @@ func getCompactionJobVolumes(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1. }) } - return vs + return vs, nil } -func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.EnvVar { +func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd) ([]v1.EnvVar, error) { var env []v1.EnvVar - if etcd.Spec.Backup.Store == nil { - return env - } storeValues := etcd.Spec.Backup.Store @@ -418,8 +449,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E provider, err := utils.StorageProviderFromInfraProvider(etcd.Spec.Backup.Store.Provider) if err != nil { - logger.Error(err, "Storage provider is not recognized. Compaction job will likely fail as there is no provider specific credentials.", "namespace", etcd.Namespace, "name", etcd.Name) - return env + return env, fmt.Errorf("storage provider is not recognized while fetching secrets from environment variable") } switch provider { @@ -435,9 +465,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E env = append(env, getEnvVarFromValues("ALICLOUD_APPLICATION_CREDENTIALS", "/var/etcd-backup")) case utils.ECS: if storeValues.SecretRef == nil { - logger.Info("No secretRef is configured for backup store. Compaction job will fail as no storage could be configured.", - "namespace", etcd.Namespace, "name", etcd.Name) - return env + return env, fmt.Errorf("no secretRef could be configured for backup store of ECS") } env = append(env, getEnvVarFromSecrets("ECS_ENDPOINT", storeValues.SecretRef.Name, "endpoint")) @@ -447,7 +475,7 @@ func getCompactionJobEnvVar(etcd *druidv1alpha1.Etcd, logger logr.Logger) []v1.E env = append(env, getEnvVarFromValues("OPENSHIFT_APPLICATION_CREDENTIALS", "/var/etcd-backup")) } - return env + return env, nil } func getEnvVarFromValues(name, value string) v1.EnvVar { diff --git a/test/integration/controllers/compaction/reconciler_test.go b/test/integration/controllers/compaction/reconciler_test.go index b6c7d90e0..ecce0ca44 100644 --- a/test/integration/controllers/compaction/reconciler_test.go +++ b/test/integration/controllers/compaction/reconciler_test.go @@ -87,7 +87,7 @@ var _ = Describe("Compaction Controller", func() { deleteEtcdSnapshotLeasesAndWait(k8sClient, instance) }, - Entry("if fields are set in etcd.Spec and TLS enabled, the resources should reflect the spec changes", "foo71", druidv1alpha1.StorageProvider("Local"), validateEtcdForCompactionJob), + Entry("if fields are set in etcd.Spec and TLS enabled, the resources should reflect the spec changes", "foo71", druidv1alpha1.StorageProvider("local"), validateEtcdForCompactionJob), Entry("if the store is S3, the statefulset and compaction job should reflect the spec changes", "foo72", druidv1alpha1.StorageProvider("aws"), validateStoreAWSForCompactionJob), Entry("if the store is ABS, the statefulset and compaction job should reflect the spec changes", "foo73", druidv1alpha1.StorageProvider("azure"), validateStoreAzureForCompactionJob), Entry("if the store is GCS, the statefulset and compaction job should reflect the spec changes", "foo74", druidv1alpha1.StorageProvider("gcp"), validateStoreGCPForCompactionJob), @@ -112,7 +112,7 @@ var _ = Describe("Compaction Controller", func() { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() - instance = testutils.EtcdBuilderWithDefaults("foo77", namespace).Build() + instance = testutils.EtcdBuilderWithDefaults("foo77", namespace).WithProviderLocal().Build() createEtcdAndWait(k8sClient, instance) // manually create full and delta snapshot leases since etcd controller is not running @@ -318,6 +318,14 @@ func validateEtcdForCompactionJob(instance *druidv1alpha1.Etcd, j *batchv1.Job) })), }), }), + "host-storage": MatchFields(IgnoreExtras, Fields{ + "Name": Equal("host-storage"), + "VolumeSource": MatchFields(IgnoreExtras, Fields{ + "HostPath": PointTo(MatchFields(IgnoreExtras, Fields{ + "Path": Equal("/etc/gardener/local-backupbuckets/default.bkp"), + })), + }), + }), }), }), }), diff --git a/test/utils/etcd.go b/test/utils/etcd.go index 514d6fefb..9935ef64b 100644 --- a/test/utils/etcd.go +++ b/test/utils/etcd.go @@ -166,6 +166,8 @@ func (eb *EtcdBuilder) WithStorageProvider(provider druidv1alpha1.StorageProvide return eb.WithProviderGCS() case "openstack": return eb.WithProviderSwift() + case "local": + return eb.WithProviderLocal() default: return eb } @@ -226,6 +228,16 @@ func (eb *EtcdBuilder) WithProviderOSS() *EtcdBuilder { return eb } +func (eb *EtcdBuilder) WithProviderLocal() *EtcdBuilder { + if eb == nil || eb.etcd == nil { + return nil + } + eb.etcd.Spec.Backup.Store = getBackupStoreForLocal( + eb.etcd.Name, + ) + return eb +} + func (eb *EtcdBuilder) Build() *druidv1alpha1.Etcd { return eb.etcd } @@ -361,6 +373,15 @@ func getBackupStore(name string, provider druidv1alpha1.StorageProvider) *druidv } } +func getBackupStoreForLocal(name string) *druidv1alpha1.StoreSpec { + provider := druidv1alpha1.StorageProvider("local") + return &druidv1alpha1.StoreSpec{ + Container: &container, + Prefix: name, + Provider: &provider, + } +} + func CheckEtcdOwnerReference(refs []metav1.OwnerReference, etcd *druidv1alpha1.Etcd) bool { for _, ownerRef := range refs { if ownerRef.UID == etcd.UID {