Skip to content

Commit

Permalink
Rework snapshot repositories handling (#752)
Browse files Browse the repository at this point in the history
### Description
This PR reworks how the operator provisions snapshot repositories. The
beta feature used a kubernetes job with curl. With this rework the
operator does the needed api calls itself using an http client. I've
also extracted the logic into its own reconciler for better separation
and testing.
This reimplementation also removes the limitation of only being able to
configure snapshot repositories after cluster provisioning which made it
unusable for any kind of GitOps approach.

### Issues Resolved
Partially #621 

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

Signed-off-by: Sebastian Woehrl <[email protected]>
  • Loading branch information
swoehrl-mw authored Mar 25, 2024
1 parent 0494837 commit 0d0a515
Show file tree
Hide file tree
Showing 11 changed files with 571 additions and 158 deletions.
12 changes: 4 additions & 8 deletions docs/userguide/main.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,9 @@ spec:

This will configure an init container for each opensearch pod that executes the needed `sysctl` command. By default the init container uses a busybox image. If you want to change that (for example to use an image from a private registry), see [Custom init helper](#custom-init-helper).

### Configuring Snapshot Repo (BETA)
### Configuring Snapshot Repositories

This feature is Currently in BETA, you can configure the snapshot repo settings for the OpenSearch cluster through the operator. Using `snapshotRepositories` settings you can configure multiple snapshot repos. Once the snapshot repo is configured a user can create custom `_ism` policies through dashboard to backup the in indexes.

Note: BETA flagged Features in a release are experimental. Therefore, we do not recommend the use of configuring snapshot repo in a production environment. For updates on the progress of snapshot/restore, or if you want leave feedback/contribute that could help improve the feature, please refer to the issue on [GitHub](https://github.com/opensearch-project/opensearch-k8s-operator/issues/278).
You can configure the snapshot repositories for the OpenSearch cluster through the operator. Using `general.snapshotRepositories` settings you can configure multiple snapshot repositories. Once the snapshot repository is configured a user can create custom `_ism` policies through dashboard to backup indexes.

```yaml
spec:
Expand All @@ -390,7 +388,7 @@ spec:

#### Prerequisites for Configuring Snapshot Repo

Before applying the setting `snapshotRepositories` to the operator, please ensure the following prerequisites are met.
Before configuring `snapshotRepositories` for a cluster, please ensure the following prerequisites are met:

1. The right cloud provider native plugins are installed. For example:

Expand All @@ -400,9 +398,7 @@ Before applying the setting `snapshotRepositories` to the operator, please ensur
pluginsList: ["repository-s3"]
```

2. Ensure the cluster is fully healthy before applying the `snapshotRepositories` settings to the custom resource. Note: For the BETA you cannot add the settings if the cluster is not yet provisioned and healthy, otherwise the configuration of the repositories will fail.

3. The required roles/permissions for the backend cloud are pre-created. Example: Following is the AWS IAM role added for kubernetes nodes so that snapshots can be published to `opensearch-s3-snapshot` s3 bucket.
2. The required roles/permissions for the backend cloud are pre-created. An example AWS IAM role added for kubernetes nodes so that snapshots can be published to the `opensearch-s3-snapshot` s3 bucket:

```json
{
Expand Down
7 changes: 7 additions & 0 deletions opensearch-operator/controllers/opensearchController.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (r *OpenSearchClusterReconciler) reconcilePhaseRunning(ctx context.Context)
&reconcilerContext,
r.Instance,
)
snapshotrepository := reconcilers.NewSnapshotRepositoryReconciler(
r.Client,
ctx,
r.Recorder,
r.Instance,
)

componentReconcilers := []reconcilers.ComponentReconciler{
tls.Reconcile,
Expand All @@ -314,6 +320,7 @@ func (r *OpenSearchClusterReconciler) reconcilePhaseRunning(ctx context.Context)
dashboards.Reconcile,
upgrade.Reconcile,
restart.Reconcile,
snapshotrepository.Reconcile,
}
for _, rec := range componentReconcilers {
result, err := rec()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package requests

type SnapshotRepository struct {
Type string `json:"type"`
Settings map[string]string `json:"settings,omitempty"`
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package responses

import "github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"

type SnapshotRepositoryResponse = map[string]requests.SnapshotRepository
35 changes: 35 additions & 0 deletions opensearch-operator/opensearch-gateway/services/os_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,30 @@ func (client *OsClusterClient) DeleteISMConfig(ctx context.Context, name string)
return doHTTPDelete(ctx, client.client, path)
}

// performs an HTTP GET request to OS to get the snapshot repository specified by name
func (client *OsClusterClient) GetSnapshotRepository(ctx context.Context, name string) (*opensearchapi.Response, error) {
path := generateAPIPathSnapshotRepository(name)
return doHTTPGet(ctx, client.client, path)
}

// performs an HTTP PUT request to OS to create the snapshot repository specified by name
func (client *OsClusterClient) CreateSnapshotRepository(ctx context.Context, name string, body io.Reader) (*opensearchapi.Response, error) {
path := generateAPIPathSnapshotRepository(name)
return doHTTPPut(ctx, client.client, path, body)
}

// performs an HTTP PUT request to OS to update the snapshot repository specified by name
func (client *OsClusterClient) UpdateSnapshotRepository(ctx context.Context, name string, body io.Reader) (*opensearchapi.Response, error) {
path := generateAPIPathSnapshotRepository(name)
return doHTTPPut(ctx, client.client, path, body)
}

// DeleteISMConfig performs an HTTP DELETE request to OS to delete the ISM policy resource specified by name
func (client *OsClusterClient) DeleteSnapshotRepository(ctx context.Context, name string) (*opensearchapi.Response, error) {
path := generateAPIPathSnapshotRepository(name)
return doHTTPDelete(ctx, client.client, path)
}

// generateAPIPathISM generates a URI PATH for a specific resource endpoint and name
// For example: resource = _ism, name = example
// URI PATH = '_plugins/_ism/policies/example'
Expand Down Expand Up @@ -388,3 +412,14 @@ func generateAPIPath(resource, name string) strings.Builder {
path.WriteString(name)
return path
}

// generates a URI PATH for a given snapshot repository name
func generateAPIPathSnapshotRepository(name string) strings.Builder {
var path strings.Builder
path.Grow(1 + len("_snapshot") + 1 + len(name))
path.WriteString("/")
path.WriteString("_snapshot")
path.WriteString("/")
path.WriteString(name)
return path
}
113 changes: 113 additions & 0 deletions opensearch-operator/opensearch-gateway/services/os_snapshot_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package services

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/requests"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/opensearch-gateway/responses"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/opensearch-project/opensearch-go/opensearchutil"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var ErrRepoNotFound = errors.New("snapshotRepository not found")

// checks if the passed SnapshotRepository is same as existing or needs update
func ShouldUpdateSnapshotRepository(ctx context.Context, newRepository, existingRepository requests.SnapshotRepository) (bool, error) {
if cmp.Equal(newRepository, existingRepository, cmpopts.EquateEmpty()) {
return false, nil
}
lg := log.FromContext(ctx).WithValues("os_service", "snapshotrepository")
lg.V(1).Info(fmt.Sprintf("existing SnapshotRepository: %+v", existingRepository))
lg.V(1).Info(fmt.Sprintf("new SnapshotRepository: %+v", newRepository))
lg.Info("snapshotRepository exists and requires update")
return true, nil
}

// checks if the snapshot repository with the given name already exists
func SnapshotRepositoryExists(ctx context.Context, service *OsClusterClient, repositoryName string) (bool, error) {
resp, err := service.GetSnapshotRepository(ctx, repositoryName)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return false, nil
} else if resp.IsError() {
return false, fmt.Errorf("response from API is %s", resp.Status())
}
return true, nil
}

// fetches the snapshot repository with the given name
func GetSnapshotRepository(ctx context.Context, service *OsClusterClient, repositoryName string) (*requests.SnapshotRepository, error) {
resp, err := service.GetSnapshotRepository(ctx, repositoryName)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, ErrRepoNotFound
} else if resp.IsError() {
return nil, fmt.Errorf("response from API is %s", resp.Status())
}
repoResponse := responses.SnapshotRepositoryResponse{}
if resp != nil && resp.Body != nil {
err := json.NewDecoder(resp.Body).Decode(&repoResponse)
if err != nil {
return nil, err
}
// the opensearch api returns a map of name -> repo config, so we extract the one for the repo we need
repo, exists := repoResponse[repositoryName]
if !exists {
return nil, ErrRepoNotFound
}
return &repo, nil
}
return nil, fmt.Errorf("response is empty")
}

// creates the given SnapshotRepository
func CreateSnapshotRepository(ctx context.Context, service *OsClusterClient, repositoryName string, repository requests.SnapshotRepository) error {
spec := opensearchutil.NewJSONReader(repository)
resp, err := service.CreateSnapshotRepository(ctx, repositoryName, spec)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to create snapshot repository: %s", resp.String())
}
return nil
}

// updates the given SnapshotRepository
func UpdateSnapshotRepository(ctx context.Context, service *OsClusterClient, repositoryName string, repository requests.SnapshotRepository) error {
spec := opensearchutil.NewJSONReader(repository)
resp, err := service.UpdateSnapshotRepository(ctx, repositoryName, spec)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to update snapshot repository: %s", resp.String())
}
return nil
}

// deletes the given SnapshotRepository
func DeleteSnapshotRepository(ctx context.Context, service *OsClusterClient, repositoryName string) error {
resp, err := service.DeleteSnapshotRepository(ctx, repositoryName)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return fmt.Errorf("failed to delete snapshot repository: %s", resp.String())
}
return nil
}
83 changes: 4 additions & 79 deletions opensearch-operator/pkg/builders/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (
/// package that declare and build all the resources that related to the OpenSearch cluster ///

const (
ConfigurationChecksumAnnotation = "opster.io/config"
DefaultDiskSize = "30Gi"
defaultMonitoringPlugin = "https://github.com/aiven/prometheus-exporter-plugin-for-opensearch/releases/download/%s.0/prometheus-exporter-%s.0.zip"
securityconfigChecksumAnnotation = "securityconfig/checksum"
snapshotRepoConfigChecksumAnnotation = "snapshotrepoconfig/checksum"
ConfigurationChecksumAnnotation = "opster.io/config"
DefaultDiskSize = "30Gi"
defaultMonitoringPlugin = "https://github.com/aiven/prometheus-exporter-plugin-for-opensearch/releases/download/%s.0/prometheus-exporter-%s.0.zip"
securityconfigChecksumAnnotation = "securityconfig/checksum"
)

func NewSTSForNodePool(
Expand Down Expand Up @@ -978,80 +977,6 @@ func STSInNodePools(sts appsv1.StatefulSet, nodepools []opsterv1.NodePool) bool
return false
}

func NewSnapshotRepoconfigUpdateJob(
instance *opsterv1.OpenSearchCluster,
jobName string,
namespace string,
checksum string,
volumes []corev1.Volume,
volumeMounts []corev1.VolumeMount,
) batchv1.Job {
httpPort, _, _ := helpers.VersionCheck(instance)
dns := DnsOfService(instance)
var snapshotCmd string
for _, repository := range instance.Spec.General.SnapshotRepositories {
var snapshotSettings string

// Sort keys to have a stable order
keys := helpers.SortedKeys(repository.Settings)
for _, settingsKey := range keys {
snapshotSettings += fmt.Sprintf("\"%s\": \"%s\" , ", settingsKey, repository.Settings[settingsKey])
}
snapshotSettings = strings.TrimRight(snapshotSettings, " ,")
snapshotCmd += fmt.Sprintf("curl --fail-with-body -s -k -u \"$(cat /mnt/admin-credentials/username):$(cat /mnt/admin-credentials/password)\" -X PUT https://%s.svc.cluster.local:%v/_snapshot/%s?pretty -H \"Content-Type: application/json\" -d %c{\"type\": \"%s\", \"settings\": {%s}}%c; ", dns, fmt.Sprint(httpPort), repository.Name, '\'', repository.Type, snapshotSettings, '\'')
}
terminationGracePeriodSeconds := int64(5)
backoffLimit := int32(0)

node := opsterv1.NodePool{
Component: "snapshotconfig",
}
annotations := map[string]string{
snapshotRepoConfigChecksumAnnotation: checksum,
}
image := helpers.ResolveImage(instance, &node)

volumes = append(volumes, corev1.Volume{
Name: "admin-credentials",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{SecretName: fmt.Sprintf("%s-admin-password", instance.Name)},
},
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: "admin-credentials",
MountPath: "/mnt/admin-credentials",
})

podSecurityContext := instance.Spec.General.PodSecurityContext
securityContext := instance.Spec.General.SecurityContext

return batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: jobName, Namespace: namespace, Annotations: annotations},
Spec: batchv1.JobSpec{
BackoffLimit: &backoffLimit,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Name: jobName},
Spec: corev1.PodSpec{
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
Containers: []corev1.Container{{
Name: "snapshotrepoconfig",
Image: image.GetImage(),
ImagePullPolicy: image.GetImagePullPolicy(),
Command: []string{"/bin/bash", "-c"},
Args: []string{snapshotCmd},
VolumeMounts: volumeMounts,
SecurityContext: securityContext,
}},
ServiceAccountName: instance.Spec.General.ServiceAccount,
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
SecurityContext: podSecurityContext,
},
},
},
}
}

func NewSecurityconfigUpdateJob(
instance *opsterv1.OpenSearchCluster,
jobName string,
Expand Down
20 changes: 0 additions & 20 deletions opensearch-operator/pkg/builders/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,26 +386,6 @@ var _ = Describe("Builders", func() {
})
})

When("When Reconciling the snapshotRepoJob", func() {
It("should create a snapshotconfig batch job", func() {
clusterObject := ClusterDescWithVersion("2.2.1")
clusterObject.ObjectMeta.Namespace = "snapshot"
clusterObject.Spec.General.ServiceName = "snapshotservice"

snapshotRepoSettings := map[string]string{"bucket": "opensearch-s3-snapshot", "region": "us-east-1", "base_path": "os-snapshot"}
snapshotConfig := opsterv1.SnapshotRepoConfig{
Name: "os-snap",
Type: "s3",
Settings: snapshotRepoSettings,
}
clusterObject.Spec.General.SnapshotRepositories = []opsterv1.SnapshotRepoConfig{snapshotConfig}
result := NewSnapshotRepoconfigUpdateJob(&clusterObject, "snapshotrepoconfig", "foobar", "snapshotrepoconfig/checksum", nil, nil)
Expect(result.Spec.Template.Spec.Containers[0].Name).To(Equal("snapshotrepoconfig"))
snapshotCmd := "curl --fail-with-body -s -k -u \"$(cat /mnt/admin-credentials/username):$(cat /mnt/admin-credentials/password)\" -X PUT https://snapshotservice.snapshot.svc.cluster.local:9200/_snapshot/os-snap?pretty -H \"Content-Type: application/json\" -d '{\"type\": \"s3\", \"settings\": {\"base_path\": \"os-snapshot\" , \"bucket\": \"opensearch-s3-snapshot\" , \"region\": \"us-east-1\"}}'; "
Expect(result.Spec.Template.Spec.Containers[0].Args).To(ContainElement(snapshotCmd))
})
})

When("Constructing a bootstrap pod", func() {
It("should use General.DefaultRepo for the InitHelper image if configured", func() {
clusterObject := ClusterDescWithVersion("2.2.1")
Expand Down
Loading

0 comments on commit 0d0a515

Please sign in to comment.