diff --git a/operator/pkg/psp/emm.go b/operator/pkg/psp/emm.go index cfc79b3b7..38fb29411 100644 --- a/operator/pkg/psp/emm.go +++ b/operator/pkg/psp/emm.go @@ -23,6 +23,8 @@ package psp import ( + "fmt" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -270,8 +272,22 @@ func (impl *EMMServiceImpl) getNodeNameSet() (utils.StringSet, error) { if err != nil { return nil, err } + totalNodes := len(nodes) - return utils.GetNodeNameSet(nodes), nil + agentNodesIndex := []int{} + for i := 0; i < totalNodes; i++ { + if nodes[i].Labels["kubernetes.io/role"] == "agent" { + agentNodesIndex = append(agentNodesIndex, i) + } + } + + agents := make([]*corev1.Node, len(agentNodesIndex)) + for i, agentIndex := range agentNodesIndex { + agents[i] = nodes[agentIndex] + } + + nameSet := utils.GetNodeNameSet(agents) + return nameSet, nil } func (impl *EMMServiceImpl) getPodNameSet() utils.StringSet { @@ -523,8 +539,10 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { // pods unavailableNodes := utils.UnionStringSet(plannedDownNodeNameSet, evacuateDataNodeNameSet) availableNodes := utils.SubtractStringSet(allNodes, unavailableNodes) - if len(provider.getPodNameSet()) > len(availableNodes) { + logger.Info(fmt.Sprintf("Not enough space to do EMM. Total number of nodes: %v, unavailable: %v, available: %v, pods: %v", + len(allNodes), len(unavailableNodes), len(availableNodes), len(provider.getPodNameSet()))) + anyUpdated := false updated := false for node := range unavailableNodes { diff --git a/operator/pkg/reconciliation/check_nodes.go b/operator/pkg/reconciliation/check_nodes.go index b8877fcd4..4e3a76ae4 100644 --- a/operator/pkg/reconciliation/check_nodes.go +++ b/operator/pkg/reconciliation/check_nodes.go @@ -137,9 +137,10 @@ func (rc *ReconciliationContext) GetAllNodes() ([]*corev1.Node, error) { if err := rc.Client.List(rc.Ctx, nodeList, listOptions); err != nil { return nil, err } - var nodes []*corev1.Node - for _, node := range nodeList.Items { - nodes = append(nodes, &node) + nodeCount := len(nodeList.Items) + nodes := make([]*corev1.Node, nodeCount) + for i := 0; i < nodeCount; i++ { + nodes[i] = &nodeList.Items[i] } return nodes, nil } diff --git a/operator/pkg/reconciliation/decommission_node.go b/operator/pkg/reconciliation/decommission_node.go index 9497fbd19..c8f1d941d 100644 --- a/operator/pkg/reconciliation/decommission_node.go +++ b/operator/pkg/reconciliation/decommission_node.go @@ -127,9 +127,8 @@ func (rc *ReconciliationContext) DecommissionNodeOnRack(rackName string, epData } if err := rc.NodeMgmtClient.CallDecommissionNodeEndpoint(pod); err != nil { - // TODO this returns a 500 when it works - // We are waiting for a new version of mgmt api with a fix for this - // return err + rc.ReqLogger.Info(fmt.Sprintf("Error from decommission attempt. This is only an attempt and can"+ + " fail it will be retried later if decomission has not started. Error: %v", err)) } rc.ReqLogger.Info("Marking node as decommissioning") @@ -159,14 +158,20 @@ func (rc *ReconciliationContext) CheckDecommissioningNodes(epData httphelper.Cas for _, pod := range rc.dcPods { if pod.Labels[api.CassNodeState] == stateDecommissioning { if !IsDoneDecommissioning(pod, epData) { - rc.ReqLogger.Info("Node decommissioning, reconciling again soon") + if !HasStartedDecommissioning(pod, epData) { + rc.ReqLogger.Info("Decommission has not started trying again") + if err := rc.NodeMgmtClient.CallDecommissionNodeEndpoint(pod); err != nil { + rc.ReqLogger.Info(fmt.Sprintf("Error from decomimssion attempt. This is only an attempt and can fail. Error: %v", err)) + } + } else { + rc.ReqLogger.Info("Node decommissioning, reconciling again soon") + } } else { rc.ReqLogger.Info("Node finished decommissioning") if res := rc.cleanUpAfterDecommissionedPod(pod); res != nil { return res } } - return result.RequeueSoon(5) } } @@ -195,7 +200,6 @@ func (rc *ReconciliationContext) cleanUpAfterDecommissionedPod(pod *corev1.Pod) if err != nil { return result.Error(err) } - rc.ReqLogger.Info("Deleting pod PVCs") err = rc.DeletePodPvcs(pod) if err != nil { @@ -214,6 +218,16 @@ func (rc *ReconciliationContext) cleanUpAfterDecommissionedPod(pod *corev1.Pod) return nil } +func HasStartedDecommissioning(pod *v1.Pod, epData httphelper.CassMetadataEndpoints) bool { + for idx := range epData.Entity { + ep := &epData.Entity[idx] + if ep.GetRpcAddress() == pod.Status.PodIP { + return strings.HasPrefix(ep.Status, "LEAVING") + } + } + return false +} + func IsDoneDecommissioning(pod *v1.Pod, epData httphelper.CassMetadataEndpoints) bool { for idx := range epData.Entity { ep := &epData.Entity[idx] diff --git a/operator/pkg/reconciliation/decommission_node_test.go b/operator/pkg/reconciliation/decommission_node_test.go new file mode 100644 index 000000000..f78dd6864 --- /dev/null +++ b/operator/pkg/reconciliation/decommission_node_test.go @@ -0,0 +1,157 @@ +// Copyright DataStax, Inc. +// Please see the included license file for details. + +package reconciliation + +import ( + "context" + "io/ioutil" + "net/http" + "strings" + "testing" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/datastax/cass-operator/operator/internal/result" + api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" + "github.com/datastax/cass-operator/operator/pkg/mocks" + "github.com/stretchr/testify/mock" +) + +func TestRetryDecommissionNode(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + state := "UP" + podIP := "192.168.101.11" + + mockClient := &mocks.Client{} + rc.Client = mockClient + + rc.Datacenter.SetCondition(api.DatacenterCondition{ + Status: v1.ConditionTrue, + Type: api.DatacenterScalingDown, + }) + res := &http.Response{ + StatusCode: http.StatusBadRequest, + Body: ioutil.NopCloser(strings.NewReader("OK")), + } + mockHttpClient := &mocks.HttpClient{} + mockHttpClient.On("Do", + mock.MatchedBy( + func(req *http.Request) bool { + return req.URL.Path == "/api/v0/ops/node/decommission" + })). + Return(res, nil). + Once() + + rc.NodeMgmtClient = httphelper.NodeMgmtClient{ + Client: mockHttpClient, + Log: rc.ReqLogger, + Protocol: "http", + } + + labels := make(map[string]string) + labels[api.CassNodeState] = stateDecommissioning + + rc.dcPods = []*v1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: labels, + }, + Status: v1.PodStatus{ + PodIP: podIP, + }, + }} + + epData := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + RpcAddress: podIP, + Status: state, + }, + }, + } + r := rc.CheckDecommissioningNodes(epData) + if r != result.RequeueSoon(5) { + t.Fatalf("expected result of result.RequeueSoon(5) but got %s", r) + } +} + +func TestRemoveResourcesWhenDone(t *testing.T) { + rc, _, cleanupMockScr := setupTest() + defer cleanupMockScr() + podIP := "192.168.101.11" + state := "LEFT" + + mockClient := &mocks.Client{} + rc.Client = mockClient + rc.Datacenter.SetCondition(api.DatacenterCondition{ + Status: v1.ConditionTrue, + Type: api.DatacenterScalingDown, + }) + mockStatus := &statusMock{} + k8sMockClientStatus(mockClient, mockStatus) + + labels := make(map[string]string) + labels[api.CassNodeState] = stateDecommissioning + + rc.dcPods = []*v1.Pod{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: labels, + }, + Status: v1.PodStatus{ + PodIP: podIP, + }, + }} + + makeInt := func(i int32) *int32 { + return &i + } + ssLabels := make(map[string]string) + rc.statefulSets = []*appsv1.StatefulSet{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ss-1", + Labels: ssLabels, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: makeInt(1), + }, + }} + + epData := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + RpcAddress: podIP, + Status: state, + }, + }, + } + + r := rc.CheckDecommissioningNodes(epData) + if r != result.RequeueSoon(5) { + t.Fatalf("expected result of blah but got %s", r) + } + if mockStatus.called != 1 { + t.Fatalf("expected 1 call to mockStatus but had %v", mockStatus.called) + } +} + +type statusMock struct { + called int +} + +func (s *statusMock) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOption) error { + return nil +} + +func (s *statusMock) Patch(ctx context.Context, obj runtime.Object, patch client.Patch, opts ...client.PatchOption) error { + s.called = s.called + 1 + return nil +} diff --git a/operator/pkg/reconciliation/testing.go b/operator/pkg/reconciliation/testing.go index 99e8f2c86..f2d86c015 100644 --- a/operator/pkg/reconciliation/testing.go +++ b/operator/pkg/reconciliation/testing.go @@ -184,6 +184,12 @@ func k8sMockClientGet(mockClient *mocks.Client, returnArg interface{}) *mock.Cal Once() } +func k8sMockClientStatus(mockClient *mocks.Client, returnArg interface{}) *mock.Call { + return mockClient.On("Status"). + Return(returnArg). + Once() +} + func k8sMockClientUpdate(mockClient *mocks.Client, returnArg interface{}) *mock.Call { return mockClient.On("Update", mock.MatchedBy(