From 9a5098c9667ae8134fdd7920abde8b9436f95f26 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 31 Aug 2023 17:55:32 +0300 Subject: [PATCH] Envtesting... (#491) * First running envtest.. Read target mgmt-api port from the container namedPorts Modify cooldownPeriod to be configurable, modify the test to allow more namespaces and use blackbox fake StS controller The /start command was not using the correct BuildPodHostFromPod function Fake mgmt-api httptest server will now set the pod status when it receives the start call Add multi rack and multi node tests Modify liveness and readiness checks ports Move pkg/internal/result to internal/result, allow override of the requeue duration calculation Fix tests that did not set the PodIP correctly, move duration timers to suite_test in controllers Add deletion checks Add scale up functionality.. a bit broken though Remove unused FakeServerWithSuccess Update the tests, the ProgressState did not work since Kubernetes has precision of 1s in the metav1.Time * Modify after rebase * Fix Dockerfile after result movement --- Dockerfile | 4 +- .../cassandradatacenter_controller.go | 12 +- .../cassandradatacenter_controller_test.go | 254 ++++++++++++++++ internal/controllers/cassandra/suite_test.go | 62 +++- .../control/cassandratask_controller.go | 14 +- .../control/cassandratask_controller_test.go | 4 +- internal/controllers/control/jobs.go | 4 +- internal/envtest/fake_mgmtapi.go | 122 ++++++++ internal/envtest/statefulset_controller.go | 274 ++++++++++++++++++ .../result/result_helper.go | 4 +- .../result/result_helper_test.go | 0 pkg/httphelper/client.go | 107 +++++-- pkg/httphelper/client_test.go | 3 +- pkg/reconciliation/constructor.go | 1 + pkg/reconciliation/decommission_node.go | 2 +- pkg/reconciliation/decommission_node_test.go | 2 +- pkg/reconciliation/handler.go | 2 +- pkg/reconciliation/reconcile_configsecret.go | 2 +- pkg/reconciliation/reconcile_datacenter.go | 2 +- pkg/reconciliation/reconcile_endpoints.go | 2 +- pkg/reconciliation/reconcile_fql.go | 3 +- pkg/reconciliation/reconcile_fql_test.go | 2 +- pkg/reconciliation/reconcile_racks.go | 9 +- pkg/reconciliation/reconcile_racks_test.go | 4 +- pkg/reconciliation/reconcile_services.go | 2 +- 25 files changed, 828 insertions(+), 69 deletions(-) create mode 100644 internal/controllers/cassandra/cassandradatacenter_controller_test.go create mode 100644 internal/envtest/fake_mgmtapi.go create mode 100644 internal/envtest/statefulset_controller.go rename {pkg/internal => internal}/result/result_helper.go (89%) rename {pkg/internal => internal}/result/result_helper_test.go (100%) diff --git a/Dockerfile b/Dockerfile index f1dd9531..4cadec57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,10 +12,10 @@ COPY go.sum go.sum RUN go mod download # Copy the go source -COPY cmd/main.go cmd/main.go +COPY cmd/ cmd/ COPY apis/ apis/ COPY pkg/ pkg/ -COPY internal/controllers/ internal/controllers/ +COPY internal/ internal/ # Build RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o manager cmd/main.go diff --git a/internal/controllers/cassandra/cassandradatacenter_controller.go b/internal/controllers/cassandra/cassandradatacenter_controller.go index d4ce6fd2..133cb352 100644 --- a/internal/controllers/cassandra/cassandradatacenter_controller.go +++ b/internal/controllers/cassandra/cassandradatacenter_controller.go @@ -47,6 +47,11 @@ import ( configv1beta1 "github.com/k8ssandra/cass-operator/apis/config/v1beta1" ) +var ( + cooldownPeriod = 20 * time.Second + minimumRequeueTime = 500 * time.Millisecond +) + // datastax.com groups //+kubebuilder:rbac:groups=cassandra.datastax.com,namespace=cass-operator,resources=cassandradatacenters,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=cassandra.datastax.com,namespace=cass-operator,resources=cassandradatacenters/status,verbs=get;update;patch @@ -117,7 +122,7 @@ func (r *CassandraDatacenterReconciler) Reconcile(ctx context.Context, request c // Error reading the object logger.Error(err, "Failed to get CassandraDatacenter.") - return ctrl.Result{RequeueAfter: 10 * time.Second}, err + return ctrl.Result{}, err } if err := rc.IsValid(rc.Datacenter); err != nil { @@ -127,7 +132,6 @@ func (r *CassandraDatacenterReconciler) Reconcile(ctx context.Context, request c } // TODO fold this into the quiet period - cooldownPeriod := time.Second * 20 lastNodeStart := rc.Datacenter.Status.LastServerNodeStarted cooldownTime := time.Until(lastNodeStart.Add(cooldownPeriod)) @@ -152,8 +156,8 @@ func (r *CassandraDatacenterReconciler) Reconcile(ctx context.Context, request c // Prevent immediate requeue if res.Requeue { - if res.RequeueAfter.Milliseconds() < 500 { - res.RequeueAfter = time.Duration(500 * time.Millisecond) + if res.RequeueAfter < minimumRequeueTime { + res.RequeueAfter = minimumRequeueTime } } return res, err diff --git a/internal/controllers/cassandra/cassandradatacenter_controller_test.go b/internal/controllers/cassandra/cassandradatacenter_controller_test.go new file mode 100644 index 00000000..27cd0a4b --- /dev/null +++ b/internal/controllers/cassandra/cassandradatacenter_controller_test.go @@ -0,0 +1,254 @@ +package controllers + +import ( + "context" + "fmt" + "math/rand" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" +) + +var ( + testNamespaceName string +) + +func clusterName() string { + // TODO Modify when multiple clusters are needed + return "cluster1" +} + +func createDatacenter(ctx context.Context, dcName string, nodeCount, rackCount int) cassdcapi.CassandraDatacenter { + testDc := createStubCassDc(dcName, int32(nodeCount)) + + testDc.Spec.Racks = make([]cassdcapi.Rack, rackCount) + for i := 0; i < rackCount; i++ { + testDc.Spec.Racks[i] = cassdcapi.Rack{ + Name: fmt.Sprintf("r%d", i), + } + } + + Expect(k8sClient.Create(ctx, &testDc)).Should(Succeed()) + return testDc +} + +func deleteDatacenter(ctx context.Context, dcName string) { + dc := cassdcapi.CassandraDatacenter{} + dcKey := types.NamespacedName{Name: dcName, Namespace: testNamespaceName} + Expect(k8sClient.Get(ctx, dcKey, &dc)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &dc)).To(Succeed()) +} + +func waitForDatacenterProgress(ctx context.Context, dcName string, state cassdcapi.ProgressState) { + Eventually(func(g Gomega) { + dc := cassdcapi.CassandraDatacenter{} + key := types.NamespacedName{Namespace: testNamespaceName, Name: dcName} + + g.Expect(k8sClient.Get(ctx, key, &dc)).To(Succeed()) + g.Expect(dc.Status.CassandraOperatorProgress).To(Equal(state)) + }).WithTimeout(20 * time.Second).WithPolling(200 * time.Millisecond).WithContext(ctx).Should(Succeed()) +} + +func waitForDatacenterReady(ctx context.Context, dcName string) { + waitForDatacenterProgress(ctx, dcName, cassdcapi.ProgressReady) +} + +var _ = Describe("CassandraDatacenter tests", func() { + Describe("Creating a new datacenter", func() { + Context("Single datacenter", func() { + BeforeEach(func() { + testNamespaceName = fmt.Sprintf("test-cassdc-%d", rand.Int31()) + testNamespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespaceName, + }, + } + Expect(k8sClient.Create(context.Background(), testNamespace)).Should(Succeed()) + }) + + AfterEach(func() { + testNamespaceDel := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespaceName, + }, + } + Expect(k8sClient.Delete(context.TODO(), testNamespaceDel)).To(Succeed()) + }) + When("There is a single rack and a single node", func() { + It("should end up in a Ready state", func(ctx SpecContext) { + dcName := "dc1" + + createDatacenter(ctx, dcName, 1, 1) + waitForDatacenterReady(ctx, dcName) + + verifyStsCount(ctx, dcName, 1, 1) + verifyPodCount(ctx, dcName, 1) + + deleteDatacenter(ctx, dcName) + verifyDatacenterDeleted(ctx, dcName) + }) + It("should be able to scale up", func(ctx SpecContext) { + dcName := "dc11" + + dc := createDatacenter(ctx, dcName, 1, 1) + waitForDatacenterReady(ctx, dcName) + + verifyStsCount(ctx, dcName, 1, 1) + verifyPodCount(ctx, dcName, 1) + + key := types.NamespacedName{Namespace: testNamespaceName, Name: dcName} + Expect(k8sClient.Get(ctx, key, &dc)).To(Succeed()) + + By("Updating the size to 3") + dc.Spec.Size = 3 + Expect(k8sClient.Update(ctx, &dc)).To(Succeed()) + + Eventually(func(g Gomega) { + verifyStsCount(ctx, dcName, 1, 3) + verifyPodCount(ctx, dcName, 3) + }) + + waitForDatacenterReady(ctx, dcName) + + deleteDatacenter(ctx, dcName) + verifyDatacenterDeleted(ctx, dcName) + }) + }) + When("There are multiple nodes in a single rack", func() { + It("should end up in a Ready state", func(ctx SpecContext) { + dcName := "dc2" + + createDatacenter(ctx, dcName, 3, 1) + + waitForDatacenterReady(ctx, dcName) + + verifyStsCount(ctx, dcName, 1, 3) + verifyPodCount(ctx, dcName, 3) + + deleteDatacenter(ctx, dcName) + verifyDatacenterDeleted(ctx, dcName) + }) + }) + When("There are multiple nodes in multiple racks", func() { + It("should end up in a Ready state", func(ctx SpecContext) { + dcName := "dc3" + + createDatacenter(ctx, dcName, 9, 3) + waitForDatacenterReady(ctx, dcName) + + verifyStsCount(ctx, dcName, 3, 3) + verifyPodCount(ctx, dcName, 9) + + deleteDatacenter(ctx, dcName) + verifyDatacenterDeleted(ctx, dcName) + }) + }) + }) + }) +}) + +func verifyStsCount(ctx context.Context, dcName string, rackCount, podsPerSts int) { + Eventually(func(g Gomega) { + stsAll := &appsv1.StatefulSetList{} + g.Expect(k8sClient.List(ctx, stsAll, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName}, client.InNamespace(testNamespaceName))).To(Succeed()) + g.Expect(len(stsAll.Items)).To(Equal(rackCount)) + + for _, sts := range stsAll.Items { + rackName := sts.Labels[cassdcapi.RackLabel] + + podList := &corev1.PodList{} + g.Expect(k8sClient.List(ctx, podList, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName, cassdcapi.RackLabel: rackName}, client.InNamespace(testNamespaceName))).To(Succeed()) + g.Expect(len(podList.Items)).To(Equal(podsPerSts)) + } + }).Should(Succeed()) +} + +func verifyPodCount(ctx context.Context, dcName string, podCount int) { + Eventually(func(g Gomega) { + podList := &corev1.PodList{} + g.Expect(k8sClient.List(ctx, podList, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName}, client.InNamespace(testNamespaceName))).To(Succeed()) + g.Expect(len(podList.Items)).To(Equal(podCount)) + }).Should(Succeed()) +} + +func verifyDatacenterDeleted(ctx context.Context, dcName string) { + Eventually(func(g Gomega) { + // Envtest has no garbage collection, so we can only compare that the ownerReferences are correct and they would be GCed (for items which we do not remove) + + // Check that DC no longer exists + dc := &cassdcapi.CassandraDatacenter{} + dcKey := types.NamespacedName{Name: dcName, Namespace: testNamespaceName} + err := k8sClient.Get(ctx, dcKey, dc) + g.Expect(errors.IsNotFound(err)).To(BeTrue()) + + // Check that services would be autodeleted + svcList := &corev1.ServiceList{} + g.Expect(k8sClient.List(ctx, svcList, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName}, client.InNamespace(testNamespaceName))).To(Succeed()) + for _, svc := range svcList.Items { + g.Expect(len(svc.OwnerReferences)).To(Equal(1)) + verifyOwnerReference(g, svc.OwnerReferences[0], dcName) + } + + // Check that all StS would be autoremoved + stsAll := &appsv1.StatefulSetList{} + g.Expect(k8sClient.List(ctx, stsAll, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName}, client.InNamespace(testNamespaceName))).To(Succeed()) + for _, sts := range stsAll.Items { + g.Expect(len(sts.OwnerReferences)).To(Equal(1)) + verifyOwnerReference(g, sts.OwnerReferences[0], dcName) + } + + // Check that all PVCs were removed (we remove these) + pvcList := &corev1.PersistentVolumeClaimList{} + g.Expect(k8sClient.List(ctx, pvcList, client.MatchingLabels{cassdcapi.DatacenterLabel: dcName}, client.InNamespace(testNamespaceName))).To(Succeed()) + for _, pvc := range pvcList.Items { + g.Expect(pvc.GetDeletionTimestamp()).ToNot(BeNil()) + } + + }).WithTimeout(10 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) +} + +func verifyOwnerReference(g Gomega, ownerRef metav1.OwnerReference, dcName string) { + g.Expect(ownerRef.Kind).To(Equal("CassandraDatacenter")) + g.Expect(ownerRef.Name).To(Equal(dcName)) + g.Expect(ownerRef.APIVersion).To(Equal("cassandra.datastax.com/v1beta1")) +} + +func createStubCassDc(dcName string, nodeCount int32) cassdcapi.CassandraDatacenter { + return cassdcapi.CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: dcName, + Namespace: testNamespaceName, + Annotations: map[string]string{}, + }, + Spec: cassdcapi.CassandraDatacenterSpec{ + ManagementApiAuth: cassdcapi.ManagementApiAuthConfig{ + Insecure: &cassdcapi.ManagementApiAuthInsecureConfig{}, + }, + ClusterName: clusterName(), + ServerType: "cassandra", + ServerVersion: "4.0.7", + Size: nodeCount, + StorageConfig: cassdcapi.StorageConfig{ + CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{ + StorageClassName: pointer.String("default"), + AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"}, + Resources: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{"storage": resource.MustParse("1Gi")}, + }, + }, + }, + }, + Status: cassdcapi.CassandraDatacenterStatus{}, + } +} diff --git a/internal/controllers/cassandra/suite_test.go b/internal/controllers/cassandra/suite_test.go index d8d9fb30..15f0875a 100644 --- a/internal/controllers/cassandra/suite_test.go +++ b/internal/controllers/cassandra/suite_test.go @@ -20,9 +20,14 @@ import ( "context" "path/filepath" "testing" + "time" + "github.com/k8ssandra/cass-operator/internal/controllers/control" + internal "github.com/k8ssandra/cass-operator/internal/envtest" + "github.com/k8ssandra/cass-operator/internal/result" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,6 +36,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" cassandradatastaxcomv1beta1 "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + configv1beta1 "github.com/k8ssandra/cass-operator/apis/config/v1beta1" + controlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" + "github.com/k8ssandra/cass-operator/pkg/images" + "github.com/k8ssandra/cass-operator/pkg/reconciliation" //+kubebuilder:scaffold:imports ) @@ -48,7 +57,14 @@ func TestAPIs(t *testing.T) { } var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + opts := zap.Options{ + Development: true, + TimeEncoder: zapcore.ISO8601TimeEncoder, + DestWriter: GinkgoWriter, + } + + logf.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + ctx, cancel = context.WithCancel(context.TODO()) By("bootstrapping test environment") @@ -57,6 +73,8 @@ var _ = BeforeSuite(func() { ErrorIfCRDPathMissing: true, } + Expect(images.ParseImageConfig(filepath.Join("..", "..", "..", "config", "manager", "image_config.yaml"))).To(Succeed()) + cfg, err := testEnv.Start() Expect(err).NotTo(HaveOccurred()) Expect(cfg).NotTo(BeNil()) @@ -64,6 +82,9 @@ var _ = BeforeSuite(func() { err = cassandradatastaxcomv1beta1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = controlapi.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + //+kubebuilder:scaffold:scheme k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) @@ -76,14 +97,45 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + operConfig := &configv1beta1.OperatorConfig{ + OLMDeployed: false, + } + err = (&CassandraDatacenterReconciler{ - Client: k8sManager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"), - Scheme: k8sManager.GetScheme(), - Recorder: k8sManager.GetEventRecorderFor("cass-operator"), + Client: k8sClient, + Log: ctrl.Log.WithName("controllers").WithName("CassandraDatacenter"), + Scheme: k8sManager.GetScheme(), + Recorder: k8sManager.GetEventRecorderFor("cass-operator"), + OperatorConfig: operConfig, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) + err = (&internal.StatefulSetReconciler{ + Client: k8sClient, + Scheme: k8sManager.GetScheme(), + }).SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + + err = (&control.CassandraTaskReconciler{ + Client: k8sClient, + Scheme: k8sManager.GetScheme(), + }).SetupWithManager(k8sManager) + Expect(err).ToNot(HaveOccurred()) + + // Reduce the polling times and sleeps to speed up the tests + cooldownPeriod = 1 * time.Millisecond + minimumRequeueTime = 10 * time.Millisecond + result.DurationFunc = func(msec int) time.Duration { + return time.Duration(msec * int(time.Millisecond)) + } + + reconciliation.QuietDurationFunc = func(msec int) time.Duration { + return time.Duration(msec * int(time.Millisecond)) + } + + control.JobRunningRequeue = time.Duration(1 * time.Millisecond) + control.TaskRunningRequeue = time.Duration(1 * time.Millisecond) + go func() { defer GinkgoRecover() err = k8sManager.Start(ctx) diff --git a/internal/controllers/control/cassandratask_controller.go b/internal/controllers/control/cassandratask_controller.go index f1a69b7a..648a5b3e 100644 --- a/internal/controllers/control/cassandratask_controller.go +++ b/internal/controllers/control/cassandratask_controller.go @@ -54,8 +54,8 @@ const ( // These are vars to allow modifications for testing var ( - jobRunningRequeue = 10 * time.Second - taskRunningRequeue = time.Duration(5 * time.Second) + JobRunningRequeue = 10 * time.Second + TaskRunningRequeue = time.Duration(5 * time.Second) ) // CassandraTaskReconciler reconciles a CassandraJob object @@ -219,12 +219,12 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques if len(activeTasks) > 0 { if cassTask.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent { logger.V(1).Info("this job isn't allowed to run due to ConcurrencyPolicy restrictions", "activeTasks", len(activeTasks)) - return ctrl.Result{RequeueAfter: taskRunningRequeue}, nil + return ctrl.Result{RequeueAfter: TaskRunningRequeue}, nil } for _, task := range activeTasks { if task.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent { logger.V(1).Info("this job isn't allowed to run due to ConcurrencyPolicy restrictions", "activeTasks", len(activeTasks)) - return ctrl.Result{RequeueAfter: taskRunningRequeue}, nil + return ctrl.Result{RequeueAfter: TaskRunningRequeue}, nil } } } @@ -666,12 +666,12 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc continue } else if details.Status == podJobWaiting { // Job is still running or waiting - return ctrl.Result{RequeueAfter: jobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil } } else { if len(jobRunner) > 0 { // Something is still holding the worker - return ctrl.Result{RequeueAfter: jobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil } // Nothing is holding the job, this pod has finished @@ -761,7 +761,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc } // We have a job going on, return back later to check the status - return ctrl.Result{RequeueAfter: jobRunningRequeue}, failed, completed, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil } return ctrl.Result{}, failed, completed, nil } diff --git a/internal/controllers/control/cassandratask_controller_test.go b/internal/controllers/control/cassandratask_controller_test.go index a9f4a809..02ca3c66 100644 --- a/internal/controllers/control/cassandratask_controller_test.go +++ b/internal/controllers/control/cassandratask_controller_test.go @@ -227,8 +227,8 @@ func waitForTaskCompletion(taskKey types.NamespacedName) *api.CassandraTask { var _ = Describe("CassandraTask controller tests", func() { Describe("Execute jobs against all pods", func() { - jobRunningRequeue = time.Duration(1 * time.Millisecond) - taskRunningRequeue = time.Duration(1 * time.Millisecond) + JobRunningRequeue = time.Duration(1 * time.Millisecond) + TaskRunningRequeue = time.Duration(1 * time.Millisecond) Context("Async jobs", func() { var testNamespaceName string BeforeEach(func() { diff --git a/internal/controllers/control/jobs.go b/internal/controllers/control/jobs.go index e2a93a2a..72b5d04b 100644 --- a/internal/controllers/control/jobs.go +++ b/internal/controllers/control/jobs.go @@ -92,14 +92,14 @@ func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.S restartedPods += int(status.UpdatedReplicas) taskConfig.Completed = restartedPods // This is still restarting - return ctrl.Result{RequeueAfter: jobRunningRequeue}, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil } st.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation] = restartTime if err := r.Client.Update(ctx, &st); err != nil { return ctrl.Result{}, err } - return ctrl.Result{RequeueAfter: jobRunningRequeue}, nil + return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil } // We're done diff --git a/internal/envtest/fake_mgmtapi.go b/internal/envtest/fake_mgmtapi.go new file mode 100644 index 00000000..4b4fa212 --- /dev/null +++ b/internal/envtest/fake_mgmtapi.go @@ -0,0 +1,122 @@ +package envtest + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "sync" + + "github.com/go-logr/logr" + "github.com/k8ssandra/cass-operator/pkg/httphelper" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + nodeCounter int = 0 + epMutex sync.Mutex + epData httphelper.CassMetadataEndpoints +) + +func FakeServer(cli client.Client, logger logr.Logger, podKey types.NamespacedName) (*httptest.Server, error) { + // We're only interested in the PodName + PodNamespace.. + epMutex.Lock() + myNodeName := strconv.Itoa(nodeCounter) + if epData.Entity == nil { + epData.Entity = make([]httphelper.EndpointState, 0, 1) + } + + epData.Entity = append(epData.Entity, httphelper.EndpointState{ + HostID: myNodeName, + EndpointIP: "127.0.0.1", + NativeAddressAndPort: "127.0.0.1:9042", + NativeTransportAddress: "127.0.0.1", + ReleaseVersion: "4.0.7", + Status: string(httphelper.StatusNormal), + }) + + nodeCounter++ + + epMutex.Unlock() + + handlerFunc := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + query, err := url.ParseQuery(r.URL.RawQuery) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + } + + if r.Method == http.MethodPost { + switch r.URL.Path { + case "/api/v0/ops/seeds/reload": + w.WriteHeader(http.StatusOK) + case "/api/v0/lifecycle/start": + logger.Info(fmt.Sprintf("Received call to start the pod.. %v", podKey)) + + pod := &corev1.Pod{} + if err := cli.Get(context.TODO(), podKey, pod); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + patchPod := client.MergeFrom(pod.DeepCopy()) + pod.Status.ContainerStatuses[0].Ready = true + if err := cli.Status().Patch(context.TODO(), pod, patchPod); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + case "/api/v0/ops/auth/role": + w.WriteHeader(http.StatusCreated) + case "/api/v0/ops/node/decommission": + epMutex.Lock() + for i := 0; i < len(epData.Entity); i++ { + if epData.Entity[i].HostID == myNodeName { + epData.Entity[i].Status = string(httphelper.StatusLeft) + } + } + epMutex.Unlock() + w.WriteHeader(http.StatusOK) + case "/api/v0/ops/keyspace/cleanup": + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + } else if r.Method == http.MethodGet { + switch r.URL.Path { + case "/api/v0/probes/cluster": + // Health check call + consistency := query.Get("consistency_level") + rf := query.Get("rf_per_dc") + if consistency != "LOCAL_QUORUM" || rf == "" { + w.WriteHeader(http.StatusBadRequest) + break + } + w.WriteHeader(http.StatusOK) + case "/api/v0/metadata/endpoints": + b, err := json.Marshal(&epData) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + break + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(b) + default: + w.WriteHeader(http.StatusNotFound) + } + } + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + + }) + + managementMockServer := httptest.NewServer(handlerFunc) + return managementMockServer, nil +} diff --git a/internal/envtest/statefulset_controller.go b/internal/envtest/statefulset_controller.go new file mode 100644 index 00000000..e831b685 --- /dev/null +++ b/internal/envtest/statefulset_controller.go @@ -0,0 +1,274 @@ +package envtest + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/k8ssandra/cass-operator/pkg/httphelper" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// StatefulSetReconciler is intended to be used with envtests. It mimics some parts of the StS controller to hide such details from the tests +type StatefulSetReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +// Kubebuilder rights intentionally omitted - this only works in the envtest without rights restrictions +func (r *StatefulSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + logger.Info("Processing StatefulSet", "NamespacedName", req.NamespacedName) + + var sts appsv1.StatefulSet + if err := r.Get(ctx, req.NamespacedName, &sts); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + intendedReplicas := int(*sts.Spec.Replicas) + + if sts.GetDeletionTimestamp() != nil { + logger.Info("StatefulSet has been marked for deletion") + // Delete the pods + for i := 0; i < intendedReplicas; i++ { + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%d", sts.Name, i), + Namespace: sts.Namespace, + }, + } + + if err := r.Client.Delete(ctx, pod); err != nil { + logger.Error(err, "Failed to delete the pod") + return ctrl.Result{}, err + } + } + + if controllerutil.RemoveFinalizer(&sts, "test.k8ssandra.io/sts-finalizer") { + if err := r.Client.Update(ctx, &sts); err != nil { + logger.Error(err, "Failed to remove finalizer from StatefulSet") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil + } + + if controllerutil.AddFinalizer(&sts, "test.k8ssandra.io/sts-finalizer") { + if err := r.Client.Update(ctx, &sts); err != nil { + logger.Error(err, "Failed to set finalizer to StatefulSet") + return ctrl.Result{}, err + } + } + + // TODO Implement new parts as needed for tests + // TODO Get existing pods and modify them . + + podList := &corev1.PodList{} + if err := r.Client.List(ctx, podList, client.MatchingLabels(sts.Spec.Template.Labels), client.InNamespace(req.Namespace)); err != nil { + logger.Error(err, "Failed to list the pods belonging to this StatefulSet") + return ctrl.Result{}, err + } + + stsPods := []*corev1.Pod{} + for _, pod := range podList.Items { + if pod.OwnerReferences[0].Name == sts.Name { + stsPods = append(stsPods, &pod) + } + } + + if len(stsPods) > intendedReplicas { + // We need to delete the pods.. + for i := len(stsPods) - 1; i > intendedReplicas; i-- { + pod := stsPods[i] + if err := r.Client.Delete(ctx, pod); err != nil { + logger.Error(err, "Failed to delete extra pod from this StS") + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil + } + + for i := 0; i < intendedReplicas; i++ { + if i <= len(stsPods)-1 { + continue + } + + podKey := types.NamespacedName{ + Name: fmt.Sprintf("%s-%d", sts.Name, i), + Namespace: sts.Namespace, + } + + pod := &corev1.Pod{ + // Technically this comes from a combination of Template.ObjectMeta, but we're just adding some fields + ObjectMeta: metav1.ObjectMeta{ + Name: podKey.Name, + Namespace: podKey.Namespace, + Labels: sts.Spec.Template.Labels, + }, + Spec: sts.Spec.Template.Spec, + } + + // tbh, why do we need to add this here..? + pod.Spec.Volumes = append(pod.Spec.Volumes, + corev1.Volume{ + Name: "server-data", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: fmt.Sprintf("server-data-%s", pod.Name), + }, + }, + }) + + if err := createPVC(ctx, r.Client, "server-data", pod); err != nil { + logger.Error(err, "Failed to create PVC server-data", "Pod", podKey.Name) + return ctrl.Result{}, err + } + + // Create management API here and use its port.. + managementMockServer, err := FakeServer(r.Client, logger, podKey) + if err != nil { + logger.Error(err, "Failed to create fake mgmtApiServer") + return ctrl.Result{}, err + } + ipPort := strings.Split(managementMockServer.Listener.Addr().String(), ":") + port, err := strconv.Atoi(ipPort[1]) + if err != nil { + logger.Error(err, "Failed to parse httptest server's port") + return ctrl.Result{}, err + } + + // TODO Change Liveness and Readiness ports also + for c := 0; c < len(pod.Spec.Containers); c++ { + if pod.Spec.Containers[c].Name == "cassandra" { + for i := 0; i < len(pod.Spec.Containers[c].Ports); i++ { + if pod.Spec.Containers[c].Ports[i].Name == "mgmt-api-http" { + pod.Spec.Containers[c].Ports[i].ContainerPort = int32(port) + break + } + } + pod.Spec.Containers[c].LivenessProbe = probe(port, httphelper.LivenessEndpoint, 15, 15, 10) + pod.Spec.Containers[c].ReadinessProbe = probe(port, httphelper.ReadinessEndpoint, 20, 10, 10) + break + } + } + + if err := controllerutil.SetControllerReference(&sts, pod, r.Scheme); err != nil { + return ctrl.Result{}, err + } + + if err := r.Client.Create(context.TODO(), pod); err != nil { + logger.Error(err, "Failed to create a Pod") + return ctrl.Result{}, err + } + + podIP := "127.0.0.1" + patchPod := client.MergeFrom(pod.DeepCopy()) + pod.Status = corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "cassandra", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(time.Now().Add(time.Second * -10)), + }, + }, + }, + }, + Phase: corev1.PodRunning, + PodIP: podIP, + PodIPs: []corev1.PodIP{{IP: podIP}}} + if err := r.Client.Status().Patch(ctx, pod, patchPod); err != nil { + logger.Error(err, "Failed to patch the Pod") + return ctrl.Result{}, err + } + } + + // Update StS status + patchSts := client.MergeFrom(sts.DeepCopy()) + sts.Status.ReadyReplicas = int32(intendedReplicas) + sts.Status.UpdatedReplicas = int32(intendedReplicas) + sts.Status.AvailableReplicas = int32(intendedReplicas) + sts.Status.CurrentReplicas = int32(intendedReplicas) + sts.Status.Replicas = int32(intendedReplicas) + sts.Status.ObservedGeneration = sts.Generation + + if err := r.Client.Status().Patch(ctx, &sts, patchSts); err != nil { + logger.Error(err, "Failed to patch StatefulSet status") + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func probe(port int, path string, initDelay int, period int, timeout int) *corev1.Probe { + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Port: intstr.FromInt(port), + Path: path, + }, + }, + InitialDelaySeconds: int32(initDelay), + PeriodSeconds: int32(period), + TimeoutSeconds: int32(timeout), + } +} + +// TODO These should be created to test certain decommission features also +func createPVC(ctx context.Context, cli client.Client, mountName string, pod *corev1.Pod) error { + volumeMode := new(corev1.PersistentVolumeMode) + *volumeMode = corev1.PersistentVolumeFilesystem + storageClassName := "default" + + pvcName := types.NamespacedName{ + Name: fmt.Sprintf("%s-%s", mountName, pod.Name), + Namespace: pod.Namespace, + } + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvcName.Name, + Namespace: pvcName.Namespace, + Labels: pod.Labels, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + // TODO Hardcoded not real value + corev1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + StorageClassName: &storageClassName, + VolumeMode: volumeMode, + VolumeName: "pvc-" + pvcName.Name, + }, + } + + return cli.Create(ctx, pvc) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *StatefulSetReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) +} diff --git a/pkg/internal/result/result_helper.go b/internal/result/result_helper.go similarity index 89% rename from pkg/internal/result/result_helper.go rename to internal/result/result_helper.go index c8e98778..d8e1a3a8 100644 --- a/pkg/internal/result/result_helper.go +++ b/internal/result/result_helper.go @@ -9,6 +9,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) +var DurationFunc func(int) time.Duration = func(secs int) time.Duration { return time.Duration(secs) * time.Second } + type ReconcileResult interface { Completed() bool Output() (ctrl.Result, error) @@ -40,7 +42,7 @@ func (c callBackSoon) Completed() bool { return true } func (c callBackSoon) Output() (ctrl.Result, error) { - t := time.Duration(c.secs) * time.Second + t := DurationFunc(c.secs) return ctrl.Result{Requeue: true, RequeueAfter: t}, nil } diff --git a/pkg/internal/result/result_helper_test.go b/internal/result/result_helper_test.go similarity index 100% rename from pkg/internal/result/result_helper_test.go rename to internal/result/result_helper_test.go diff --git a/pkg/httphelper/client.go b/pkg/httphelper/client.go index 86474130..a2821d01 100644 --- a/pkg/httphelper/client.go +++ b/pkg/httphelper/client.go @@ -34,6 +34,7 @@ type NodeMgmtClient struct { type nodeMgmtRequest struct { endpoint string host string + port int method string timeout time.Duration body []byte @@ -184,16 +185,29 @@ func NewMgmtClient(ctx context.Context, client client.Client, dc *cassdcapi.Cass }, nil } -func BuildPodHostFromPod(pod *corev1.Pod) (string, error) { +func BuildPodHostFromPod(pod *corev1.Pod) (string, int, error) { // This function previously returned the dns hostname which includes the StatefulSet's headless service, // which is the datacenter service. There are times though that we want to make a mgmt api call to the pod // before the dns hostnames are available. It is therefore more reliable to simply use the PodIP. if len(pod.Status.PodIP) == 0 { - return "", newNoPodIPError(pod) + return "", 0, newNoPodIPError(pod) } - return pod.Status.PodIP, nil + mgmtApiPort := 8080 + + // Check for port override + for _, container := range pod.Spec.Containers { + if container.Name == "cassandra" { + for _, port := range container.Ports { + if port.Name == "mgmt-api-http" { + mgmtApiPort = int(port.ContainerPort) + } + } + } + } + + return pod.Status.PodIP, mgmtApiPort, nil } func GetPodHost(podName, clusterName, dcName, namespace string) string { @@ -213,7 +227,7 @@ func parseMetadataEndpointsResponseBody(body []byte) (*CassMetadataEndpoints, er func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (CassMetadataEndpoints, error) { client.Log.Info("requesting Cassandra metadata endpoints from Node Management API", "pod", pod.Name) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return CassMetadataEndpoints{}, err } @@ -221,6 +235,7 @@ func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (Ca request := nodeMgmtRequest{ endpoint: "/api/v0/metadata/endpoints", host: podHost, + port: podPort, method: http.MethodGet, timeout: 60 * time.Second, } @@ -247,7 +262,7 @@ func (client *NodeMgmtClient) CallMetadataEndpointsEndpoint(pod *corev1.Pod) (Ca // // A map length of 1 indicates schema agreement. func (client *NodeMgmtClient) CallSchemaVersionsEndpoint(pod *corev1.Pod) (map[string][]string, error) { - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -255,6 +270,7 @@ func (client *NodeMgmtClient) CallSchemaVersionsEndpoint(pod *corev1.Pod) (map[s request := nodeMgmtRequest{ endpoint: "/api/v1/ops/node/schema/versions", host: podHost, + port: podPort, method: http.MethodGet, timeout: 60 * time.Second, } @@ -285,7 +301,7 @@ func (client *NodeMgmtClient) CallCreateRoleEndpoint(pod *corev1.Pod, username s postData.Set("can_login", "true") postData.Set("is_superuser", strconv.FormatBool(superuser)) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -293,6 +309,7 @@ func (client *NodeMgmtClient) CallCreateRoleEndpoint(pod *corev1.Pod, username s request := nodeMgmtRequest{ endpoint: fmt.Sprintf("/api/v0/ops/auth/role?%s", postData.Encode()), host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -310,7 +327,7 @@ func (client *NodeMgmtClient) CallProbeClusterEndpoint(pod *corev1.Pod, consiste "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -318,6 +335,7 @@ func (client *NodeMgmtClient) CallProbeClusterEndpoint(pod *corev1.Pod, consiste request := nodeMgmtRequest{ endpoint: fmt.Sprintf("/api/v0/probes/cluster?consistency_level=%s&rf_per_dc=%d", consistencyLevel, rfPerDc), host: podHost, + port: podPort, method: http.MethodGet, timeout: 60 * time.Second, } @@ -332,7 +350,7 @@ func (client *NodeMgmtClient) CallDrainEndpoint(pod *corev1.Pod) error { "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -340,6 +358,7 @@ func (client *NodeMgmtClient) CallDrainEndpoint(pod *corev1.Pod) error { request := nodeMgmtRequest{ endpoint: "/api/v0/ops/node/drain", host: podHost, + port: podPort, method: http.MethodPost, timeout: time.Minute * 2, } @@ -383,7 +402,7 @@ func createKeySpaceRequest(pod *corev1.Pod, jobs int, keyspaceName string, table return nil, err } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -391,6 +410,7 @@ func createKeySpaceRequest(pod *corev1.Pod, jobs int, keyspaceName string, table request := &nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodPost, body: body, } @@ -427,7 +447,7 @@ func (client *NodeMgmtClient) CallDatacenterRebuild(pod *corev1.Pod, sourceDatac "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return "", err } @@ -437,6 +457,7 @@ func (client *NodeMgmtClient) CallDatacenterRebuild(pod *corev1.Pod, sourceDatac req := nodeMgmtRequest{ endpoint: queryUrl, host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -504,7 +525,7 @@ func createCompactRequest(pod *corev1.Pod, compactRequest *CompactRequest, endpo return nil, err } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -512,6 +533,7 @@ func createCompactRequest(pod *corev1.Pod, compactRequest *CompactRequest, endpo request := &nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodPost, body: body, } @@ -578,7 +600,7 @@ func createScrubRequest(pod *corev1.Pod, scrubRequest *ScrubRequest, endpoint st return nil, err } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -586,6 +608,7 @@ func createScrubRequest(pod *corev1.Pod, scrubRequest *ScrubRequest, endpoint st request := &nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodPost, body: body, } @@ -659,7 +682,7 @@ func (client *NodeMgmtClient) modifyKeyspace(endpoint string, pod *corev1.Pod, k return err } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -667,6 +690,7 @@ func (client *NodeMgmtClient) modifyKeyspace(endpoint string, pod *corev1.Pod, k request := nodeMgmtRequest{ endpoint: fmt.Sprintf("/api/v0/ops/keyspace/%s", endpoint), host: podHost, + port: podPort, method: http.MethodPost, timeout: time.Second * 20, body: body, @@ -686,7 +710,7 @@ func parseListKeyspacesEndpointsResponseBody(body []byte) ([]string, error) { // GetKeyspace calls the management API to check if a specific keyspace exists func (client *NodeMgmtClient) GetKeyspace(pod *corev1.Pod, keyspaceName string) ([]string, error) { - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -697,6 +721,7 @@ func (client *NodeMgmtClient) GetKeyspace(pod *corev1.Pod, keyspaceName string) request := nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodGet, timeout: time.Second * 20, } @@ -722,7 +747,7 @@ func (client *NodeMgmtClient) GetKeyspaceReplication(pod *corev1.Pod, keyspaceNa if keyspaceName == "" { return nil, fmt.Errorf("keyspace name cannot be empty") } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -730,6 +755,7 @@ func (client *NodeMgmtClient) GetKeyspaceReplication(pod *corev1.Pod, keyspaceNa request := nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodGet, timeout: time.Second * 20, } @@ -749,7 +775,7 @@ func (client *NodeMgmtClient) ListTables(pod *corev1.Pod, keyspaceName string) ( if keyspaceName == "" { return nil, fmt.Errorf("keyspace name cannot be empty") } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -757,6 +783,7 @@ func (client *NodeMgmtClient) ListTables(pod *corev1.Pod, keyspaceName string) ( request := nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodGet, timeout: time.Second * 20, } @@ -861,7 +888,7 @@ func (client *NodeMgmtClient) CreateTable(pod *corev1.Pod, table *TableDefinitio if err != nil { return err } - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -869,6 +896,7 @@ func (client *NodeMgmtClient) CreateTable(pod *corev1.Pod, table *TableDefinitio request := nodeMgmtRequest{ endpoint: endpoint, host: podHost, + port: podPort, method: http.MethodPost, timeout: time.Second * 40, body: body, @@ -878,14 +906,15 @@ func (client *NodeMgmtClient) CreateTable(pod *corev1.Pod, table *TableDefinitio } func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev1.Pod, replaceIp string) error { - // talk to the pod via IP because we are dialing up a pod that isn't ready, - // so it won't be reachable via the service and pod DNS - podIP := pod.Status.PodIP + podHost, podPort, err := BuildPodHostFromPod(pod) + if err != nil { + return err + } client.Log.Info( "calling Management API start node - POST /api/v0/lifecycle/start", "pod", pod.Name, - "podIP", podIP, + "podIP", podHost, "replaceIP", replaceIp, ) @@ -897,12 +926,13 @@ func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev request := nodeMgmtRequest{ endpoint: endpoint, - host: podIP, + host: podHost, + port: podPort, method: http.MethodPost, timeout: 10 * time.Minute, } - _, err := callNodeMgmtEndpoint(client, request, "") + _, err = callNodeMgmtEndpoint(client, request, "") return err } @@ -916,7 +946,7 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error { "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -924,6 +954,7 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error { request := nodeMgmtRequest{ endpoint: "/api/v0/ops/seeds/reload", host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -939,7 +970,7 @@ func (client *NodeMgmtClient) CallDecommissionNodeEndpoint(pod *corev1.Pod) erro "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } @@ -947,6 +978,7 @@ func (client *NodeMgmtClient) CallDecommissionNodeEndpoint(pod *corev1.Pod) erro request := nodeMgmtRequest{ endpoint: "/api/v0/ops/node/decommission?force=true", host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -962,7 +994,7 @@ func (client *NodeMgmtClient) CallDecommissionNode(pod *corev1.Pod, force bool) "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return "", err } @@ -972,6 +1004,7 @@ func (client *NodeMgmtClient) CallDecommissionNode(pod *corev1.Pod, force bool) req := nodeMgmtRequest{ endpoint: queryUrl, host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -992,7 +1025,7 @@ func (client *NodeMgmtClient) FeatureSet(pod *corev1.Pod) (*FeatureSet, error) { "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -1000,6 +1033,7 @@ func (client *NodeMgmtClient) FeatureSet(pod *corev1.Pod) (*FeatureSet, error) { request := nodeMgmtRequest{ endpoint: "/api/v0/metadata/versions/features", host: podHost, + port: podPort, method: http.MethodGet, } @@ -1028,7 +1062,7 @@ func (client *NodeMgmtClient) JobDetails(pod *corev1.Pod, jobId string) (*JobDet "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return nil, err } @@ -1036,6 +1070,7 @@ func (client *NodeMgmtClient) JobDetails(pod *corev1.Pod, jobId string) (*JobDet request := nodeMgmtRequest{ endpoint: fmt.Sprintf("/api/v0/ops/executor/job?job_id=%s", jobId), host: podHost, + port: podPort, method: http.MethodGet, } @@ -1065,7 +1100,7 @@ func (client *NodeMgmtClient) CallMove(pod *corev1.Pod, newToken string) (string "newToken", newToken, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return "", err } @@ -1075,6 +1110,7 @@ func (client *NodeMgmtClient) CallMove(pod *corev1.Pod, newToken string) (string req := nodeMgmtRequest{ endpoint: queryUrl, host: podHost, + port: podPort, method: http.MethodPost, timeout: 60 * time.Second, } @@ -1089,7 +1125,12 @@ func (client *NodeMgmtClient) CallMove(pod *corev1.Pod, newToken string) (string func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest, contentType string) ([]byte, error) { client.Log.Info("client::callNodeMgmtEndpoint") - url := fmt.Sprintf("%s://%s:8080%s", client.Protocol, request.host, request.endpoint) + port := 8080 + if request.port > 0 { + port = request.port + } + + url := fmt.Sprintf("%s://%s:%d%s", client.Protocol, request.host, port, request.endpoint) var reqBody io.Reader if len(request.body) > 0 { @@ -1167,13 +1208,14 @@ func (client *NodeMgmtClient) CallIsFullQueryLogEnabledEndpoint(pod *corev1.Pod) "pod", pod.Name, ) - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return false, err } request := nodeMgmtRequest{ endpoint: "/api/v0/ops/node/fullquerylogging", host: podHost, + port: podPort, method: http.MethodGet, timeout: time.Minute * 2, } @@ -1204,13 +1246,14 @@ func (client *NodeMgmtClient) CallIsFullQueryLogEnabledEndpoint(pod *corev1.Pod) func (client *NodeMgmtClient) CallSetFullQueryLog(pod *corev1.Pod, enableFullQueryLogging bool) error { client.Log.Info("client::callIsFullQueryLogEnabledEndpoint") - podHost, err := BuildPodHostFromPod(pod) + podHost, podPort, err := BuildPodHostFromPod(pod) if err != nil { return err } request := nodeMgmtRequest{ endpoint: "/api/v0/ops/node/fullquerylogging?enabled=" + strconv.FormatBool(enableFullQueryLogging), host: podHost, + port: podPort, method: http.MethodPost, timeout: time.Minute * 2, } diff --git a/pkg/httphelper/client_test.go b/pkg/httphelper/client_test.go index f511dcbf..3b05f8ef 100644 --- a/pkg/httphelper/client_test.go +++ b/pkg/httphelper/client_test.go @@ -38,10 +38,11 @@ func Test_BuildPodHostFromPod(t *testing.T) { }, } - result, err := BuildPodHostFromPod(pod) + result, podPort, err := BuildPodHostFromPod(pod) assert.NoError(t, err) expected := "1.2.3.4" + assert.Equal(t, 8080, podPort) assert.Equal(t, expected, result) } diff --git a/pkg/reconciliation/constructor.go b/pkg/reconciliation/constructor.go index c20983d2..3f03ae00 100644 --- a/pkg/reconciliation/constructor.go +++ b/pkg/reconciliation/constructor.go @@ -44,6 +44,7 @@ func newPodDisruptionBudgetForDatacenter(dc *api.CassandraDatacenter) *policyv1. } func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressState) error { + rc.ReqLogger.Info("reconcile_racks::setOperatorProgressStatus") currentState := rc.Datacenter.Status.CassandraOperatorProgress if currentState == newState { // early return, no need to ping k8s diff --git a/pkg/reconciliation/decommission_node.go b/pkg/reconciliation/decommission_node.go index ecf27826..f8da9e7f 100644 --- a/pkg/reconciliation/decommission_node.go +++ b/pkg/reconciliation/decommission_node.go @@ -12,9 +12,9 @@ import ( "github.com/go-logr/logr" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/events" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/monitoring" "k8s.io/apimachinery/pkg/types" ) diff --git a/pkg/reconciliation/decommission_node_test.go b/pkg/reconciliation/decommission_node_test.go index 44c2dfb2..91a624ca 100644 --- a/pkg/reconciliation/decommission_node_test.go +++ b/pkg/reconciliation/decommission_node_test.go @@ -11,8 +11,8 @@ import ( "testing" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/mocks" "github.com/stretchr/testify/mock" appsv1 "k8s.io/api/apps/v1" diff --git a/pkg/reconciliation/handler.go b/pkg/reconciliation/handler.go index a84137e1..dd015714 100644 --- a/pkg/reconciliation/handler.go +++ b/pkg/reconciliation/handler.go @@ -13,8 +13,8 @@ import ( "k8s.io/apimachinery/pkg/types" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" ) // Use a var so we can mock this function diff --git a/pkg/reconciliation/reconcile_configsecret.go b/pkg/reconciliation/reconcile_configsecret.go index e79d02e7..038c3b76 100644 --- a/pkg/reconciliation/reconcile_configsecret.go +++ b/pkg/reconciliation/reconcile_configsecret.go @@ -8,8 +8,8 @@ import ( "fmt" api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/cdc" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" diff --git a/pkg/reconciliation/reconcile_datacenter.go b/pkg/reconciliation/reconcile_datacenter.go index d075b3f3..d9ff927e 100644 --- a/pkg/reconciliation/reconcile_datacenter.go +++ b/pkg/reconciliation/reconcile_datacenter.go @@ -4,8 +4,8 @@ package reconciliation import ( + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/events" - "github.com/k8ssandra/cass-operator/pkg/internal/result" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/reconciliation/reconcile_endpoints.go b/pkg/reconciliation/reconcile_endpoints.go index 87e6cfa5..7c9b08a6 100644 --- a/pkg/reconciliation/reconcile_endpoints.go +++ b/pkg/reconciliation/reconcile_endpoints.go @@ -5,7 +5,7 @@ package reconciliation import ( api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" - "github.com/k8ssandra/cass-operator/pkg/internal/result" + "github.com/k8ssandra/cass-operator/internal/result" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" diff --git a/pkg/reconciliation/reconcile_fql.go b/pkg/reconciliation/reconcile_fql.go index cb838ac6..ee3a1143 100644 --- a/pkg/reconciliation/reconcile_fql.go +++ b/pkg/reconciliation/reconcile_fql.go @@ -3,12 +3,13 @@ package reconciliation import ( "errors" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" ) // CheckFullQueryLogging sets FQL enabled or disabled. It calls the NodeMgmtClient which calls the Cassandra management API and returns a result.ReconcileResult. func (rc *ReconciliationContext) CheckFullQueryLogging() result.ReconcileResult { + rc.ReqLogger.Info("reconcile_racks::CheckFullQueryLogging") dc := rc.GetDatacenter() if !dc.DeploymentSupportsFQL() { return result.Continue() diff --git a/pkg/reconciliation/reconcile_fql_test.go b/pkg/reconciliation/reconcile_fql_test.go index 7e522973..eb98d0ac 100644 --- a/pkg/reconciliation/reconcile_fql_test.go +++ b/pkg/reconciliation/reconcile_fql_test.go @@ -8,8 +8,8 @@ import ( "testing" "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/mocks" "github.com/stretchr/testify/mock" corev1 "k8s.io/api/core/v1" diff --git a/pkg/reconciliation/reconcile_racks.go b/pkg/reconciliation/reconcile_racks.go index bab6677d..5200908d 100644 --- a/pkg/reconciliation/reconcile_racks.go +++ b/pkg/reconciliation/reconcile_racks.go @@ -24,9 +24,9 @@ import ( api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" taskapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/events" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/monitoring" "github.com/k8ssandra/cass-operator/pkg/oplabels" "github.com/k8ssandra/cass-operator/pkg/utils" @@ -37,6 +37,8 @@ var ( ResultShouldRequeueNow reconcile.Result = reconcile.Result{Requeue: true} ResultShouldRequeueSoon reconcile.Result = reconcile.Result{Requeue: true, RequeueAfter: 2 * time.Second} ResultShouldRequeueTenSecs reconcile.Result = reconcile.Result{Requeue: true, RequeueAfter: 10 * time.Second} + + QuietDurationFunc func(int) time.Duration = func(secs int) time.Duration { return time.Duration(secs) * time.Second } ) const ( @@ -1663,7 +1665,7 @@ func (rc *ReconciliationContext) labelServerPodStarting(pod *corev1.Pod) error { func (rc *ReconciliationContext) enableQuietPeriod(seconds int) error { dc := rc.Datacenter - dur := time.Second * time.Duration(seconds) + dur := QuietDurationFunc(seconds) statusPatch := client.MergeFrom(dc.DeepCopy()) dc.Status.QuietPeriod = metav1.NewTime(time.Now().Add(dur)) err := rc.Client.Status().Patch(rc.Ctx, dc, statusPatch) @@ -1816,7 +1818,6 @@ func (rc *ReconciliationContext) startOneNodePerRack(endpointData httphelper.Cas if notReady || err != nil { return notReady, err } - } return false, nil } @@ -2065,6 +2066,7 @@ func (rc *ReconciliationContext) setCondition(condition *api.DatacenterCondition } func (rc *ReconciliationContext) CheckConditionInitializedAndReady() result.ReconcileResult { + rc.ReqLogger.Info("reconcile_racks::CheckConditionInitializedAndReady") dc := rc.Datacenter dcPatch := client.MergeFrom(dc.DeepCopy()) logger := rc.ReqLogger @@ -2157,6 +2159,7 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er } func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult { + rc.ReqLogger.Info("reconcile_racks::CheckClearActionConditions") dc := rc.Datacenter logger := rc.ReqLogger dcPatch := client.MergeFrom(dc.DeepCopy()) diff --git a/pkg/reconciliation/reconcile_racks_test.go b/pkg/reconciliation/reconcile_racks_test.go index b0e6e93a..aaa9a032 100644 --- a/pkg/reconciliation/reconcile_racks_test.go +++ b/pkg/reconciliation/reconcile_racks_test.go @@ -17,8 +17,8 @@ import ( api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" taskapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" + "github.com/k8ssandra/cass-operator/internal/result" "github.com/k8ssandra/cass-operator/pkg/httphelper" - "github.com/k8ssandra/cass-operator/pkg/internal/result" "github.com/k8ssandra/cass-operator/pkg/mocks" "github.com/k8ssandra/cass-operator/pkg/oplabels" "github.com/k8ssandra/cass-operator/pkg/utils" @@ -1754,6 +1754,7 @@ func TestReconciliationContext_startAllNodes(t *testing.T) { Ready: bool(started), }, } + p.Status.PodIP = "127.0.0.1" if started { p.Labels[api.CassNodeState] = stateStarted } else { @@ -1908,6 +1909,7 @@ func TestStartOneNodePerRack(t *testing.T) { Ready: bool(started), }, } + p.Status.PodIP = "127.0.0.1" if started { p.Labels[api.CassNodeState] = stateStarted } else { diff --git a/pkg/reconciliation/reconcile_services.go b/pkg/reconciliation/reconcile_services.go index a33f9de2..9e9f6719 100644 --- a/pkg/reconciliation/reconcile_services.go +++ b/pkg/reconciliation/reconcile_services.go @@ -5,7 +5,7 @@ package reconciliation import ( api "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" - "github.com/k8ssandra/cass-operator/pkg/internal/result" + "github.com/k8ssandra/cass-operator/internal/result" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types"