diff --git a/operator/pkg/httphelper/client.go b/operator/pkg/httphelper/client.go index 444626065..3a5de9cbb 100644 --- a/operator/pkg/httphelper/client.go +++ b/operator/pkg/httphelper/client.go @@ -4,12 +4,15 @@ package httphelper import ( + "bytes" "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" + "strconv" "time" "github.com/go-logr/logr" @@ -29,16 +32,17 @@ type nodeMgmtRequest struct { host string method string timeout time.Duration + body []byte } func buildEndpoint(path string, queryParams ...string) string { params := url.Values{} - for i := 0; i < len(queryParams) - 1; i = i + 2 { + for i := 0; i < len(queryParams)-1; i = i + 2 { params[queryParams[i]] = []string{queryParams[i+1]} } url := &url.URL{ - Path: path, + Path: path, RawQuery: params.Encode(), } return url.String() @@ -94,7 +98,7 @@ func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (Ca method: http.MethodGet, } - bytes, err := callNodeMgmtEndpoint(client, request) + bytes, err := callNodeMgmtEndpoint(client, request, "") if err != nil { return CassMetadataEndpoints{}, err @@ -126,7 +130,7 @@ func (client *NodeMgmtClient) CallCreateRoleEndpoint(pod *corev1.Pod, username s host: BuildPodHostFromPod(pod), method: http.MethodPost, } - _, err := callNodeMgmtEndpoint(client, request) + _, err := callNodeMgmtEndpoint(client, request, "") return err } @@ -142,7 +146,7 @@ func (client *NodeMgmtClient) CallProbeClusterEndpoint(pod *corev1.Pod, consiste method: http.MethodGet, } - _, err := callNodeMgmtEndpoint(client, request) + _, err := callNodeMgmtEndpoint(client, request, "") return err } @@ -159,7 +163,42 @@ func (client *NodeMgmtClient) CallDrainEndpoint(pod *corev1.Pod) error { timeout: time.Minute * 2, } - _, err := callNodeMgmtEndpoint(client, request) + _, err := callNodeMgmtEndpoint(client, request, "") + return err +} + +func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs int, keyspaceName string, tables []string) error { + client.Log.Info( + "calling Management API keyspace cleanup - POST /api/v0/ops/keyspace/cleanup", + "pod", pod.Name, + ) + postData := make(map[string]interface{}) + if jobs > -1 { + postData["jobs"] = strconv.Itoa(jobs) + } + + if keyspaceName != "" { + postData["keyspace_name"] = keyspaceName + } + + if len(tables) > 0 { + postData["tables"] = tables + } + + body, err := json.Marshal(postData) + if err != nil { + return err + } + + request := nodeMgmtRequest{ + endpoint: fmt.Sprintf("/api/v0/ops/keyspace/cleanup"), + host: BuildPodHostFromPod(pod), + method: http.MethodPost, + timeout: time.Second * 20, + body: body, + } + + _, err = callNodeMgmtEndpoint(client, request, "application/json") return err } @@ -176,7 +215,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev ) endpoint := "/api/v0/lifecycle/start" - + if replaceIp != "" { endpoint = buildEndpoint(endpoint, "replace_ip", replaceIp) } @@ -188,7 +227,7 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev timeout: 10 * time.Second, } - _, err := callNodeMgmtEndpoint(client, request) + _, err := callNodeMgmtEndpoint(client, request, "") return err } @@ -208,15 +247,21 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error { method: http.MethodPost, } - _, err := callNodeMgmtEndpoint(client, request) + _, err := callNodeMgmtEndpoint(client, request, "") return err } -func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]byte, error) { +func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest, contentType string) ([]byte, error) { client.Log.Info("client::callNodeMgmtEndpoint") url := fmt.Sprintf("%s://%s:8080%s", client.Protocol, request.host, request.endpoint) - req, err := http.NewRequest(request.method, url, nil) + + var reqBody io.Reader + if len(request.body) > 0 { + reqBody = bytes.NewBuffer(request.body) + } + + req, err := http.NewRequest(request.method, url, reqBody) if err != nil { client.Log.Error(err, "unable to create request for Node Management Endpoint") return nil, err @@ -233,6 +278,10 @@ func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest) ([]by req = req.WithContext(ctx) } + if contentType != "" { + req.Header.Set("Content-Type", contentType) + } + res, err := client.Client.Do(req) if err != nil { client.Log.Error(err, "unable to perform request to Node Management Endpoint") diff --git a/operator/pkg/reconciliation/reconcile_racks.go b/operator/pkg/reconciliation/reconcile_racks.go index 2a669ee03..fcf11ad81 100644 --- a/operator/pkg/reconciliation/reconcile_racks.go +++ b/operator/pkg/reconciliation/reconcile_racks.go @@ -796,7 +796,7 @@ func (rc *ReconciliationContext) UpdateStatus() result.ReconcileResult { status = &api.CassandraDatacenterStatus{} dc.Status.DeepCopyInto(status) oldDc.Status.DeepCopyInto(&dc.Status) - + if !reflect.DeepEqual(dc, oldDc) { patch := client.MergeFrom(oldDc) if err := rc.Client.Patch(rc.Ctx, dc, patch); err != nil { @@ -1697,11 +1697,11 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco dc := rc.Datacenter dcPatch := client.MergeFrom(dc.DeepCopy()) logger := rc.ReqLogger - + updated := false updated = rc.setCondition( api.NewDatacenterCondition(api.DatacenterInitialized, corev1.ConditionTrue)) || updated - + if dc.GetConditionStatus(api.DatacenterStopped) == corev1.ConditionFalse { updated = rc.setCondition( api.NewDatacenterCondition(api.DatacenterReady, corev1.ConditionTrue)) || updated @@ -1722,6 +1722,18 @@ func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.Reco return result.Continue() } +func (rc *ReconciliationContext) cleanupAfterScaling() error { + var err error + + for idx := range rc.dcPods { + err = rc.NodeMgmtClient.CallKeyspaceCleanupEndpoint(rc.dcPods[idx], -1, "", nil) + if err == nil { + break + } + } + return err +} + func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult { dc := rc.Datacenter logger := rc.ReqLogger @@ -1731,13 +1743,26 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe // clearing conditions actionConditionTypes := []api.DatacenterConditionType{ api.DatacenterReplacingNodes, - api.DatacenterScalingUp, api.DatacenterUpdating, api.DatacenterRollingRestart, api.DatacenterResuming, } updated := false - for _, conditionType := range(actionConditionTypes) { + + // Explicitly handle scaling up here because we want to run a cleanup afterwards + if dc.GetConditionStatus(api.DatacenterScalingUp) == corev1.ConditionTrue { + err := rc.cleanupAfterScaling() + if err != nil { + logger.Error(err, "error cleaning up after scaling datacenter") + return result.Error(err) + } + + updated = rc.setCondition( + api.NewDatacenterCondition(api.DatacenterScalingUp, corev1.ConditionFalse)) || updated + } + + for _, conditionType := range actionConditionTypes { + updated = rc.setCondition( api.NewDatacenterCondition(conditionType, corev1.ConditionFalse)) || updated } @@ -1750,13 +1775,13 @@ func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileRe } // There may have been changes to the CassandraDatacenter resource that we ignored - // while executing some action on the cluster. For example, a user may have + // while executing some action on the cluster. For example, a user may have // requested to scale up the node count while we were in the middle of a rolling // restart. To account for this, we requeue to ensure reconcile gets called again // to pick up any such changes that we ignored previously. return result.RequeueSoon(0) } - + // Nothing has changed, carry on return result.Continue() }