Skip to content

Commit

Permalink
KO-382 cleanup after scaleup (#80)
Browse files Browse the repository at this point in the history
* trigger cleanup after scaleup
* update client helper to support content type and request body
  • Loading branch information
sandoichi authored May 8, 2020
1 parent b96bfd7 commit 4a71be4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 18 deletions.
71 changes: 60 additions & 11 deletions operator/pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
package httphelper

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"github.com/go-logr/logr"
Expand All @@ -29,16 +32,17 @@ type nodeMgmtRequest struct {
host string
method string
timeout time.Duration
body []byte
}

func buildEndpoint(path string, queryParams ...string) string {
params := url.Values{}
for i := 0; i < len(queryParams) - 1; i = i + 2 {
for i := 0; i < len(queryParams)-1; i = i + 2 {
params[queryParams[i]] = []string{queryParams[i+1]}
}

url := &url.URL{
Path: path,
Path: path,
RawQuery: params.Encode(),
}
return url.String()
Expand Down Expand Up @@ -94,7 +98,7 @@ func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (Ca
method: http.MethodGet,
}

bytes, err := callNodeMgmtEndpoint(client, request)
bytes, err := callNodeMgmtEndpoint(client, request, "")
if err != nil {
return CassMetadataEndpoints{}, err

Expand Down Expand Up @@ -126,7 +130,7 @@ func (client *NodeMgmtClient) CallCreateRoleEndpoint(pod *corev1.Pod, username s
host: BuildPodHostFromPod(pod),
method: http.MethodPost,
}
_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -142,7 +146,7 @@ func (client *NodeMgmtClient) CallProbeClusterEndpoint(pod *corev1.Pod, consiste
method: http.MethodGet,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -159,7 +163,42 @@ func (client *NodeMgmtClient) CallDrainEndpoint(pod *corev1.Pod) error {
timeout: time.Minute * 2,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs int, keyspaceName string, tables []string) error {
client.Log.Info(
"calling Management API keyspace cleanup - POST /api/v0/ops/keyspace/cleanup",
"pod", pod.Name,
)
postData := make(map[string]interface{})
if jobs > -1 {
postData["jobs"] = strconv.Itoa(jobs)
}

if keyspaceName != "" {
postData["keyspace_name"] = keyspaceName
}

if len(tables) > 0 {
postData["tables"] = tables
}

body, err := json.Marshal(postData)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: fmt.Sprintf("/api/v0/ops/keyspace/cleanup"),
host: BuildPodHostFromPod(pod),
method: http.MethodPost,
timeout: time.Second * 20,
body: body,
}

_, err = callNodeMgmtEndpoint(client, request, "application/json")
return err
}

Expand All @@ -176,7 +215,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev
)

endpoint := "/api/v0/lifecycle/start"

if replaceIp != "" {
endpoint = buildEndpoint(endpoint, "replace_ip", replaceIp)
}
Expand All @@ -188,7 +227,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev
timeout: 10 * time.Second,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -208,15 +247,21 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error {
method: http.MethodPost,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]byte, error) {
func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest, contentType string) ([]byte, error) {
client.Log.Info("client::callNodeMgmtEndpoint")

url := fmt.Sprintf("%s://%s:8080%s", client.Protocol, request.host, request.endpoint)
req, err := http.NewRequest(request.method, url, nil)

var reqBody io.Reader
if len(request.body) > 0 {
reqBody = bytes.NewBuffer(request.body)
}

req, err := http.NewRequest(request.method, url, reqBody)
if err != nil {
client.Log.Error(err, "unable to create request for Node Management Endpoint")
return nil, err
Expand All @@ -233,6 +278,10 @@ func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]by
req = req.WithContext(ctx)
}

if contentType != "" {
req.Header.Set("Content-Type", contentType)
}

res, err := client.Client.Do(req)
if err != nil {
client.Log.Error(err, "unable to perform request to Node Management Endpoint")
Expand Down
39 changes: 32 additions & 7 deletions operator/pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (rc *ReconciliationContext) UpdateStatus() result.ReconcileResult {
status = &api.CassandraDatacenterStatus{}
dc.Status.DeepCopyInto(status)
oldDc.Status.DeepCopyInto(&dc.Status)

if !reflect.DeepEqual(dc, oldDc) {
patch := client.MergeFrom(oldDc)
if err := rc.Client.Patch(rc.Ctx, dc, patch); err != nil {
Expand Down Expand Up @@ -1710,11 +1710,11 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco
dc := rc.Datacenter
dcPatch := client.MergeFrom(dc.DeepCopy())
logger := rc.ReqLogger

updated := false
updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterInitialized, corev1.ConditionTrue)) || updated

if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse {
updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterReady, corev1.ConditionTrue)) || updated
Expand All @@ -1735,6 +1735,18 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco
return result.Continue()
}

func (rc *ReconciliationContext) cleanupAfterScaling() error {
var err error

for idx := range rc.dcPods {
err = rc.NodeMgmtClient.CallKeyspaceCleanupEndpoint(rc.dcPods[idx], -1, "", nil)
if err == nil {
break
}
}
return err
}

func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult {
dc := rc.Datacenter
logger := rc.ReqLogger
Expand All @@ -1744,13 +1756,26 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe
// clearing conditions
actionConditionTypes := []api.DatacenterConditionType{
api.DatacenterReplacingNodes,
api.DatacenterScalingUp,
api.DatacenterUpdating,
api.DatacenterRollingRestart,
api.DatacenterResuming,
}
updated := false
for _, conditionType := range(actionConditionTypes) {

// Explicitly handle scaling up here because we want to run a cleanup afterwards
if dc.GetConditionStatus(api.DatacenterScalingUp) == corev1.ConditionTrue {
err := rc.cleanupAfterScaling()
if err != nil {
logger.Error(err, "error cleaning up after scaling datacenter")
return result.Error(err)
}

updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterScalingUp, corev1.ConditionFalse)) || updated
}

for _, conditionType := range actionConditionTypes {

updated = rc.setCondition(
api.NewDatacenterCondition(conditionType, corev1.ConditionFalse)) || updated
}
Expand All @@ -1763,13 +1788,13 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe
}

// There may have been changes to the CassandraDatacenter resource that we ignored
// while executing some action on the cluster. For example, a user may have
// while executing some action on the cluster. For example, a user may have
// requested to scale up the node count while we were in the middle of a rolling
// restart. To account for this, we requeue to ensure reconcile gets called again
// to pick up any such changes that we ignored previously.
return result.RequeueSoon(0)
}

// Nothing has changed, carry on
return result.Continue()
}
Expand Down

0 comments on commit 4a71be4

Please sign in to comment.