Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KO-382 cleanup after scaleup #80

Merged
merged 2 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions operator/pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
package httphelper

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"github.com/go-logr/logr"
Expand All @@ -29,16 +32,17 @@ type nodeMgmtRequest struct {
host string
method string
timeout time.Duration
body []byte
}

func buildEndpoint(path string, queryParams ...string) string {
params := url.Values{}
for i := 0; i < len(queryParams) - 1; i = i + 2 {
for i := 0; i < len(queryParams)-1; i = i + 2 {
params[queryParams[i]] = []string{queryParams[i+1]}
}

url := &url.URL{
Path: path,
Path: path,
RawQuery: params.Encode(),
}
return url.String()
Expand Down Expand Up @@ -94,7 +98,7 @@ func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (Ca
method: http.MethodGet,
}

bytes, err := callNodeMgmtEndpoint(client, request)
bytes, err := callNodeMgmtEndpoint(client, request, "")
if err != nil {
return CassMetadataEndpoints{}, err

Expand Down Expand Up @@ -126,7 +130,7 @@ func (client *NodeMgmtClient) CallCreateRoleEndpoint(pod *corev1.Pod, username s
host: BuildPodHostFromPod(pod),
method: http.MethodPost,
}
_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -142,7 +146,7 @@ func (client *NodeMgmtClient) CallProbeClusterEndpoint(pod *corev1.Pod, consiste
method: http.MethodGet,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -159,7 +163,42 @@ func (client *NodeMgmtClient) CallDrainEndpoint(pod *corev1.Pod) error {
timeout: time.Minute * 2,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs int, keyspaceName string, tables []string) error {
client.Log.Info(
"calling Management API keyspace cleanup - POST /api/v0/ops/keyspace/cleanup",
"pod", pod.Name,
)
postData := make(map[string]interface{})
if jobs > -1 {
postData["jobs"] = strconv.Itoa(jobs)
}

if keyspaceName != "" {
postData["keyspace_name"] = keyspaceName
}

if len(tables) > 0 {
postData["tables"] = tables
}
Comment on lines +175 to +186
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is nicer? It's not exactly the same logic, yet

postData := struct {
    Jobs string `json:"jobs"`
    Keyspace string `json:"keyspace_name"`
    Tables []string `json:"tables"`
}{
    Jobs: strconv.Itoa(jobs),
    Keyspace: keyspaceName,
    Tables: tables,
}


body, err := json.Marshal(postData)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: fmt.Sprintf("/api/v0/ops/keyspace/cleanup"),
host: BuildPodHostFromPod(pod),
method: http.MethodPost,
timeout: time.Second * 20,
body: body,
}

_, err = callNodeMgmtEndpoint(client, request, "application/json")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is an http constant for this, I think

return err
}

Expand All @@ -176,7 +215,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev
)

endpoint := "/api/v0/lifecycle/start"

if replaceIp != "" {
endpoint = buildEndpoint(endpoint, "replace_ip", replaceIp)
}
Expand All @@ -188,7 +227,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev
timeout: 10 * time.Second,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

Expand All @@ -208,15 +247,21 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error {
method: http.MethodPost,
}

_, err := callNodeMgmtEndpoint(client, request)
_, err := callNodeMgmtEndpoint(client, request, "")
return err
}

func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]byte, error) {
func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest, contentType string) ([]byte, error) {
client.Log.Info("client::callNodeMgmtEndpoint")

url := fmt.Sprintf("%s://%s:8080%s", client.Protocol, request.host, request.endpoint)
req, err := http.NewRequest(request.method, url, nil)

var reqBody io.Reader
if len(request.body) > 0 {
reqBody = bytes.NewBuffer(request.body)
}

req, err := http.NewRequest(request.method, url, reqBody)
if err != nil {
client.Log.Error(err, "unable to create request for Node Management Endpoint")
return nil, err
Expand All @@ -233,6 +278,10 @@ func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]by
req = req.WithContext(ctx)
}

if contentType != "" {
req.Header.Set("Content-Type", contentType)
}

res, err := client.Client.Do(req)
if err != nil {
client.Log.Error(err, "unable to perform request to Node Management Endpoint")
Expand Down
39 changes: 32 additions & 7 deletions operator/pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func (rc *ReconciliationContext) UpdateStatus() result.ReconcileResult {
status = &api.CassandraDatacenterStatus{}
dc.Status.DeepCopyInto(status)
oldDc.Status.DeepCopyInto(&dc.Status)

if !reflect.DeepEqual(dc, oldDc) {
patch := client.MergeFrom(oldDc)
if err := rc.Client.Patch(rc.Ctx, dc, patch); err != nil {
Expand Down Expand Up @@ -1697,11 +1697,11 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco
dc := rc.Datacenter
dcPatch := client.MergeFrom(dc.DeepCopy())
logger := rc.ReqLogger

updated := false
updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterInitialized, corev1.ConditionTrue)) || updated

if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse {
updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterReady, corev1.ConditionTrue)) || updated
Expand All @@ -1722,6 +1722,18 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco
return result.Continue()
}

func (rc *ReconciliationContext) cleanupAfterScaling() error {
var err error

for idx := range rc.dcPods {
err = rc.NodeMgmtClient.CallKeyspaceCleanupEndpoint(rc.dcPods[idx], -1, "", nil)
if err == nil {
break
}
}
return err
}

func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult {
dc := rc.Datacenter
logger := rc.ReqLogger
Expand All @@ -1731,13 +1743,26 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe
// clearing conditions
actionConditionTypes := []api.DatacenterConditionType{
api.DatacenterReplacingNodes,
api.DatacenterScalingUp,
api.DatacenterUpdating,
api.DatacenterRollingRestart,
api.DatacenterResuming,
}
updated := false
for _, conditionType := range(actionConditionTypes) {

// Explicitly handle scaling up here because we want to run a cleanup afterwards
if dc.GetConditionStatus(api.DatacenterScalingUp) == corev1.ConditionTrue {
err := rc.cleanupAfterScaling()
if err != nil {
logger.Error(err, "error cleaning up after scaling datacenter")
return result.Error(err)
}

updated = rc.setCondition(
api.NewDatacenterCondition(api.DatacenterScalingUp, corev1.ConditionFalse)) || updated
}

for _, conditionType := range actionConditionTypes {

updated = rc.setCondition(
api.NewDatacenterCondition(conditionType, corev1.ConditionFalse)) || updated
}
Expand All @@ -1750,13 +1775,13 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe
}

// There may have been changes to the CassandraDatacenter resource that we ignored
// while executing some action on the cluster. For example, a user may have
// while executing some action on the cluster. For example, a user may have
// requested to scale up the node count while we were in the middle of a rolling
// restart. To account for this, we requeue to ensure reconcile gets called again
// to pick up any such changes that we ignored previously.
return result.RequeueSoon(0)
}

// Nothing has changed, carry on
return result.Continue()
}
Expand Down