Skip to content

Commit

Permalink
Merge pull request #241 from girishc13/migrate-to-persistent-cluster-…
Browse files Browse the repository at this point in the history
…settings

Migrate to persistent cluster settings
  • Loading branch information
otrosien authored Mar 25, 2022
2 parents 760b1b4 + 1294fa2 commit a74287c
Show file tree
Hide file tree
Showing 31 changed files with 400 additions and 66 deletions.
19 changes: 19 additions & 0 deletions docs/migrating-from-transient-settings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Migrating away from Transient Cluster Settings

The [transient cluster settings](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/settings.html#cluster-setting-types)
are
being [deprecated](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/migrating-7.16.html#breaking_716_settings_deprecations)
from ES 7.16.0. The es-operator was relying on the transient cluster settings for operating on the cluster because
transient settings have the highest priority. The es-operator controlled mainly the cluster rebalance and the exclude
ips list for the cluster scaling actions. Moving forward the es-operator will now exclusively operate only edit the
persistent cluster settings. Some teams might still rely on using the transient settings for manual cluster operations
which can inadvertently cause issues with the es-operator updates to the cluster settings causing an inconsistent
cluster state. To avoid this the es-operator is also copying any existing non empty transient settings related to the
cluster rebalance and the exclude ips list into the persistent settings before updating the new values for the
persistent settings. To avoid cluster inconsistencies with the new es-operator, we recommend the below migrations steps
before deploying the new es-operator.

1. Follow the
official [Transient settings migration guide](https://www.elastic.co/guide/en/elasticsearch/reference/8.1/transient-settings-migration-guide.html).
2. Update any custom scripts that are still operating on transient settings.
3. Deploy the new es-operator.
141 changes: 106 additions & 35 deletions operator/es_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/es-operator/operator/null"
"gopkg.in/resty.v1"
v1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -67,22 +68,86 @@ type ESHealth struct {
Status string `json:"status"`
}

type Exclude struct {
IP null.String `json:"_ip,omitempty"`
}

type Allocation struct {
Exclude Exclude `json:"exclude,omitempty"`
}
type Rebalance struct {
Enable null.String `json:"enable,omitempty"`
}

type Routing struct {
Allocation Allocation `json:"allocation,omitempty"`
Rebalance Rebalance `json:"rebalance,omitempty"`
}

type Cluster struct {
Routing Routing `json:"routing,omitempty"`
}

type ClusterSettings struct {
Cluster Cluster `json:"cluster"`
}

// ESSettings represent response from _cluster/settings
type ESSettings struct {
Transient struct {
Cluster struct {
Routing struct {
Allocation struct {
Exclude struct {
IP string `json:"_ip"`
} `json:"exclude"`
} `json:"allocation"`
Rebalance struct {
Enable string `json:"enable"`
} `json:"rebalance"`
} `json:"routing"`
} `json:"cluster"`
} `json:"transient"`
Transient ClusterSettings `json:"transient,omitempty"`
Persistent ClusterSettings `json:"persistent,omitempty"`
}

func deduplicateIPs(excludedIPsString string) string {
if excludedIPsString == "" {
return ""
}

uniqueIPsMap := make(map[string]struct{})
uniqueIPsList := []string{}
excludedIPs := strings.Split(excludedIPsString, ",")
for _, excludedIP := range excludedIPs {
if _, ok := uniqueIPsMap[excludedIP]; !ok {
uniqueIPsMap[excludedIP] = struct{}{}
uniqueIPsList = append(uniqueIPsList, excludedIP)
}
}

return strings.Join(uniqueIPsList, ",")
}

func (esSettings *ESSettings) MergeNonEmptyTransientSettings() {
if value := esSettings.GetTransientRebalance().ValueOrZero(); value != "" {
esSettings.Persistent.Cluster.Routing.Rebalance.Enable = null.StringFromPtr(&value)
esSettings.Transient.Cluster.Routing.Rebalance.Enable = null.StringFromPtr(nil)
}

if transientExcludeIps := esSettings.GetTransientExcludeIPs().ValueOrZero(); transientExcludeIps != "" {
persistentExcludeIps := esSettings.GetPersistentExcludeIPs().ValueOrZero()
if persistentExcludeIps == "" {
esSettings.Persistent.Cluster.Routing.Allocation.Exclude.IP = null.StringFromPtr(&transientExcludeIps)
} else {
mergedIps := null.StringFrom(deduplicateIPs(transientExcludeIps + "," + persistentExcludeIps))
esSettings.Persistent.Cluster.Routing.Allocation.Exclude.IP = mergedIps
}
esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP = null.StringFromPtr(nil)
}
}

func (esSettings *ESSettings) GetTransientExcludeIPs() null.String {
return esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
}

func (esSettings *ESSettings) GetPersistentExcludeIPs() null.String {
return esSettings.Persistent.Cluster.Routing.Allocation.Exclude.IP
}

func (esSettings *ESSettings) GetTransientRebalance() null.String {
return esSettings.Transient.Cluster.Routing.Rebalance.Enable
}

func (esSettings *ESSettings) GetPersistentRebalance() null.String {
return esSettings.Persistent.Cluster.Routing.Rebalance.Enable
}

func (c *ESClient) logger() *log.Entry {
Expand All @@ -101,7 +166,12 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
return err
}
c.logger().Info("Disabling auto-rebalance")
err = c.updateAutoRebalance("none")
esSettings, err := c.getClusterSettings()
if err != nil {
return err
}

err = c.updateAutoRebalance("none", esSettings)
if err != nil {
return err
}
Expand Down Expand Up @@ -130,7 +200,7 @@ func (c *ESClient) Cleanup(ctx context.Context) error {
}

// 3. clean up exclude._ip settings based on known IPs from (1)
excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
excludedIPsString := esSettings.GetPersistentExcludeIPs().ValueOrZero()
excludedIPs := strings.Split(excludedIPsString, ",")
var newExcludedIPs []string
for _, excludeIP := range excludedIPs {
Expand All @@ -148,15 +218,15 @@ func (c *ESClient) Cleanup(ctx context.Context) error {
c.logger().Infof("Setting exclude list to '%s'", strings.Join(newExcludedIPs, ","))

// 4. update exclude._ip setting
err = c.setExcludeIPs(newExcludedIPsString)
err = c.setExcludeIPs(newExcludedIPsString, esSettings)
if err != nil {
return err
}
}

if esSettings.Transient.Cluster.Routing.Rebalance.Enable != "all" {
if esSettings.GetPersistentRebalance().ValueOrZero() != "all" {
c.logger().Info("Enabling auto-rebalance")
return c.updateAutoRebalance("all")
return c.updateAutoRebalance("all", esSettings)
}
return nil
}
Expand Down Expand Up @@ -198,6 +268,7 @@ func (c *ESClient) getClusterSettings() (*ESSettings, error) {
if err != nil {
return nil, err
}
esSettings.MergeNonEmptyTransientSettings()
return &esSettings, nil
}

Expand All @@ -214,7 +285,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
return err
}

excludeString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
excludeString := esSettings.GetPersistentExcludeIPs().ValueOrZero()

// add pod IP to exclude list
ips := []string{}
Expand All @@ -231,22 +302,18 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
if !foundPodIP {
ips = append(ips, podIP)
sort.Strings(ips)
err = c.setExcludeIPs(strings.Join(ips, ","))
err = c.setExcludeIPs(strings.Join(ips, ","), esSettings)
}

c.mux.Unlock()
return err
}

func (c *ESClient) setExcludeIPs(ips string) error {
func (c *ESClient) setExcludeIPs(ips string, originalESSettings *ESSettings) error {
originalESSettings.updateExcludeIps(ips)
resp, err := resty.New().R().
SetHeader("Content-Type", "application/json").
SetBody([]byte(
fmt.Sprintf(
`{"transient" : {"cluster.routing.allocation.exclude._ip" : "%s"}}`,
ips,
),
)).
SetBody(originalESSettings).
Put(c.Endpoint.String() + "/_cluster/settings")
if err != nil {
return err
Expand All @@ -257,15 +324,15 @@ func (c *ESClient) setExcludeIPs(ips string) error {
return nil
}

func (c *ESClient) updateAutoRebalance(value string) error {
func (esSettings *ESSettings) updateExcludeIps(ips string) {
esSettings.Persistent.Cluster.Routing.Allocation.Exclude.IP = null.StringFromPtr(&ips)
}

func (c *ESClient) updateAutoRebalance(value string, originalESSettings *ESSettings) error {
originalESSettings.updateRebalance(value)
resp, err := resty.New().R().
SetHeader("Content-Type", "application/json").
SetBody([]byte(
fmt.Sprintf(
`{"transient" : {"cluster.routing.rebalance.enable" : "%s"}}`,
value,
),
)).
SetBody(originalESSettings).
Put(c.Endpoint.String() + "/_cluster/settings")
if err != nil {
return err
Expand All @@ -276,6 +343,10 @@ func (c *ESClient) updateAutoRebalance(value string) error {
return nil
}

func (esSettings *ESSettings) updateRebalance(value string) {
esSettings.Persistent.Cluster.Routing.Rebalance.Enable = null.StringFromPtr(&value)
}

// repeatedly query shard allocations to ensure success of drain operation.
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
// TODO: implement context handling
Expand Down
Loading

0 comments on commit a74287c

Please sign in to comment.