diff --git a/config/samples/logstash/logstash_svc.yaml b/config/samples/logstash/logstash_svc.yaml index 410caae8d3..cdbae09dd3 100644 --- a/config/samples/logstash/logstash_svc.yaml +++ b/config/samples/logstash/logstash_svc.yaml @@ -20,6 +20,7 @@ spec: config: log.level: info api.http.host: "0.0.0.0" + api.http.port: 9601 queue.type: memory services: - name: api @@ -27,10 +28,10 @@ spec: spec: type: ClusterIP ports: - - port: 9600 + - port: 9601 name: "api" protocol: TCP - targetPort: 9600 + targetPort: 9601 - name: beats service: spec: diff --git a/pkg/controller/logstash/config_test.go b/pkg/controller/logstash/config_test.go new file mode 100644 index 0000000000..ff7a92beb4 --- /dev/null +++ b/pkg/controller/logstash/config_test.go @@ -0,0 +1,150 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func Test_newConfig(t *testing.T) { + type args struct { + runtimeObjs []runtime.Object + logstash v1alpha1.Logstash + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "no user config", + args: args{ + runtimeObjs: nil, + logstash: v1alpha1.Logstash{}, + }, + want: `api: + http: + host: 0.0.0.0 +`, + wantErr: false, + }, + { + name: "inline user config", + args: args{ + runtimeObjs: nil, + logstash: v1alpha1.Logstash{ + Spec: v1alpha1.LogstashSpec{Config: &commonv1.Config{Data: map[string]interface{}{ + "log.level": "debug", + }}}, + }, + }, + want: `api: + http: + host: 0.0.0.0 +log: + level: debug +`, + wantErr: false, + }, + { + name: "with configRef", + args: args{ + runtimeObjs: []runtime.Object{secretWithConfig("cfg", []byte("log.level: debug"))}, + logstash: logstashWithConfigRef("cfg", nil), + }, + want: `api: + http: + host: 0.0.0.0 +log: + level: debug +`, + wantErr: false, + }, + { + name: "config takes precedence", + args: args{ + runtimeObjs: []runtime.Object{secretWithConfig("cfg", []byte("log.level: debug"))}, + logstash: logstashWithConfigRef("cfg", &commonv1.Config{Data: map[string]interface{}{ + "log.level": "warn", + }}), + }, + want: `api: + http: + host: 0.0.0.0 +log: + level: warn +`, + wantErr: false, + }, + { + name: "non existing configRef", + args: args{ + logstash: logstashWithConfigRef("cfg", nil), + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params := Params{ + Context: context.Background(), + Client: k8s.NewFakeClient(tt.args.runtimeObjs...), + EventRecorder: record.NewFakeRecorder(10), + Watches: watches.NewDynamicWatches(), + Logstash: tt.args.logstash, + } + + got, err := buildConfig(params) + if (err != nil) != tt.wantErr { + t.Errorf("newConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr { + return // no point in checking the config contents + } + require.NoError(t, err) + if string(got) != tt.want { + t.Errorf("newConfig() got = \n%v\n, want \n%v\n", string(got), tt.want) + } + }) + } +} + +func secretWithConfig(name string, cfg []byte) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: name, + }, + Data: map[string][]byte{ + LogstashConfigFileName: cfg, + }, + } +} + +func logstashWithConfigRef(name string, cfg *commonv1.Config) v1alpha1.Logstash { + return v1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ls", + Namespace: "ns", + }, + Spec: v1alpha1.LogstashSpec{ + Config: cfg, + ConfigRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: name}}}, + } +} \ No newline at end of file diff --git a/pkg/controller/logstash/logstash_controller_test.go b/pkg/controller/logstash/logstash_controller_test.go new file mode 100644 index 0000000000..d8647f42e8 --- /dev/null +++ b/pkg/controller/logstash/logstash_controller_test.go @@ -0,0 +1,465 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/comparison" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/hash" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func newReconcileLogstash(objs ...runtime.Object) *ReconcileLogstash { + r := &ReconcileLogstash{ + Client: k8s.NewFakeClient(objs...), + recorder: record.NewFakeRecorder(100), + dynamicWatches: watches.NewDynamicWatches(), + } + return r +} + +func TestReconcileLogstash_Reconcile(t *testing.T) { + defaultLabels := (&logstashv1alpha1.Logstash{ObjectMeta: metav1.ObjectMeta{Name: "testLogstash"}}).GetIdentityLabels() + tests := []struct { + name string + objs []runtime.Object + request reconcile.Request + want reconcile.Result + expected logstashv1alpha1.Logstash + expectedObjects expectedObjects + wantErr bool + }{ + { + name: "valid unmanaged Logstash does not increment observedGeneration", + objs: []runtime.Object{ + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 1, + Annotations: map[string]string{ + common.ManagedAnnotation: "false", + }, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + }, + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "testLogstash", + }, + }, + want: reconcile.Result{}, + expected: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 1, + Annotations: map[string]string{ + common.ManagedAnnotation: "false", + }, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + expectedObjects: []expectedObject{}, + wantErr: false, + }, + { + name: "too long name fails validation, and updates observedGeneration", + objs: []runtime.Object{ + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstashwithtoolongofanamereallylongname", + Namespace: "test", + Generation: 2, + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + }, + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "testLogstashwithtoolongofanamereallylongname", + }, + }, + want: reconcile.Result{}, + expected: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstashwithtoolongofanamereallylongname", + Namespace: "test", + Generation: 2, + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 2, + }, + }, + expectedObjects: []expectedObject{}, + wantErr: true, + }, + { + name: "Logstash with ready StatefulSet and Pod updates status and creates secrets and service", + objs: []runtime.Object{ + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Labels: addLabel(defaultLabels, hash.TemplateHashLabelName, "3145706383"), + }, + Status: appsv1.StatefulSetStatus{ + AvailableReplicas: 1, + Replicas: 1, + ReadyReplicas: 1, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Generation: 2, + Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.6.1"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "testLogstash", + }, + }, + want: reconcile.Result{}, + expectedObjects: []expectedObject{ + { + t: &corev1.Service{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-api"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-config"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-pipeline"}, + }, + }, + + expected: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + }, + Status: logstashv1alpha1.LogstashStatus{ + Version: "8.6.1", + ExpectedNodes: 1, + AvailableNodes: 1, + ObservedGeneration: 2, + }, + }, + wantErr: false, + }, + { + name: "Logstash with a custom service creates secrets and service", + objs: []runtime.Object{ + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + Services: []logstashv1alpha1.LogstashService{{ + Name: "test", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Protocol: "TCP", Port: 9500}, + }, + }, + }, + }}, + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Labels: addLabel(defaultLabels, hash.TemplateHashLabelName, "3145706383"), + }, + Status: appsv1.StatefulSetStatus{ + AvailableReplicas: 1, + Replicas: 1, + ReadyReplicas: 1, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Generation: 2, + Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.6.1"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "testLogstash", + }, + }, + want: reconcile.Result{}, + expectedObjects: []expectedObject{ + { + t: &corev1.Service{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-api"}, + }, + { + t: &corev1.Service{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-test"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-config"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-pipeline"}, + }, + }, + + expected: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + Services: []logstashv1alpha1.LogstashService{{ + Name: "test", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Protocol: "TCP", Port: 9500}, + }, + }, + }, + }}, + }, + Status: logstashv1alpha1.LogstashStatus{ + Version: "8.6.1", + ExpectedNodes: 1, + AvailableNodes: 1, + ObservedGeneration: 2, + }, + }, + wantErr: false, + }, + { + name: "Logstash with a service with no port creates secrets and service", + objs: []runtime.Object{ + &logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + Services: []logstashv1alpha1.LogstashService{{ + Name: "api", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: nil, + }, + }, + }}, + }, + Status: logstashv1alpha1.LogstashStatus{ + ObservedGeneration: 1, + }, + }, + &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Labels: addLabel(defaultLabels, hash.TemplateHashLabelName, "3145706383"), + }, + Status: appsv1.StatefulSetStatus{ + AvailableReplicas: 1, + Replicas: 1, + ReadyReplicas: 1, + }, + }, + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash-ls", + Namespace: "test", + Generation: 2, + Labels: map[string]string{NameLabelName: "testLogstash", VersionLabelName: "8.6.1"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: "test", + Name: "testLogstash", + }, + }, + want: reconcile.Result{}, + expectedObjects: []expectedObject{ + { + t: &corev1.Service{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-api"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-config"}, + }, + { + t: &corev1.Secret{}, + name: types.NamespacedName{Namespace: "test", Name: "testLogstash-ls-pipeline"}, + }, + }, + + expected: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testLogstash", + Namespace: "test", + Generation: 2, + }, + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Count: 1, + Services: []logstashv1alpha1.LogstashService{{ + Name: "api", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: nil, + }, + }, + }}, + }, + Status: logstashv1alpha1.LogstashStatus{ + Version: "8.6.1", + ExpectedNodes: 1, + AvailableNodes: 1, + ObservedGeneration: 2, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := newReconcileLogstash(tt.objs...) + got, err := r.Reconcile(context.Background(), tt.request) + if (err != nil) != tt.wantErr { + t.Errorf("ReconcileLogstash.Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ReconcileLogstash.Reconcile() = %v, want %v", got, tt.want) + } + + var Logstash logstashv1alpha1.Logstash + if err := r.Client.Get(context.Background(), tt.request.NamespacedName, &Logstash); err != nil { + t.Error(err) + return + } + tt.expectedObjects.assertExist(t, r.Client) + comparison.AssertEqual(t, &Logstash, &tt.expected) + }) + } +} + +func addLabel(labels map[string]string, key, value string) map[string]string { + newLabels := make(map[string]string, len(labels)) + for k, v := range labels { + newLabels[k] = v + } + newLabels[key] = value + return newLabels +} + +type expectedObject struct { + t client.Object + name types.NamespacedName +} + +type expectedObjects []expectedObject + +func (e expectedObjects) assertExist(t *testing.T, k8s client.Client) { + t.Helper() + for _, o := range e { + obj := o.t.DeepCopyObject().(client.Object) //nolint:forcetypeassert + assert.NoError(t, k8s.Get(context.Background(), o.name, obj), "Expected object not found: %s", o.name) + } +} \ No newline at end of file diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index af9c6d53f2..2bd3a6c8b2 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -91,8 +91,9 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS WithDockerImage(spec.Image, container.ImageRepository(container.LogstashImage, spec.Version)). WithAutomountServiceAccountToken(). WithPorts(ports). - WithReadinessProbe(readinessProbe(false)). - WithVolumeLikes(vols...) + WithReadinessProbe(readinessProbe(params.Logstash)). + WithVolumeLikes(vols...). + WithInitContainerDefaults() builder, err := stackmon.WithMonitoring(params.Context, params.Client, builder, params.Logstash) if err != nil { @@ -117,12 +118,15 @@ func getDefaultContainerPorts() []corev1.ContainerPort { } // readinessProbe is the readiness probe for the Logstash container -func readinessProbe(useTLS bool) corev1.Probe { - scheme := corev1.URISchemeHTTP - if useTLS { - scheme = corev1.URISchemeHTTPS +func readinessProbe(logstash logstashv1alpha1.Logstash) corev1.Probe { + var scheme = corev1.URISchemeHTTP + var port = network.HTTPPort + for _, service := range logstash.Spec.Services { + if service.Name == LogstashAPIServiceName && len(service.Service.Spec.Ports) > 0 { + port = int(service.Service.Spec.Ports[0].Port) + } } - return corev1.Probe{ + probe := corev1.Probe{ FailureThreshold: 3, InitialDelaySeconds: 30, PeriodSeconds: 10, @@ -130,10 +134,11 @@ func readinessProbe(useTLS bool) corev1.Probe { TimeoutSeconds: 5, ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ - Port: intstr.FromInt(network.HTTPPort), + Port: intstr.FromInt(port), Path: "/", Scheme: scheme, }, }, } -} + return probe +} \ No newline at end of file diff --git a/pkg/controller/logstash/pod_test.go b/pkg/controller/logstash/pod_test.go new file mode 100644 index 0000000000..60fc57cb7f --- /dev/null +++ b/pkg/controller/logstash/pod_test.go @@ -0,0 +1,285 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "testing" + + "hash/fnv" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/container" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/pod" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func TestNewPodTemplateSpec(t *testing.T) { + tests := []struct { + name string + logstash logstashv1alpha1.Logstash + assertions func(pod corev1.PodTemplateSpec) + }{ + { + name: "defaults", + logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + }, + }, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, false, *pod.Spec.AutomountServiceAccountToken) + assert.Len(t, pod.Spec.Containers, 1) + assert.Len(t, pod.Spec.InitContainers, 0) + assert.Len(t, pod.Spec.Volumes, 2) + assert.NotEmpty(t, pod.Annotations[ConfigHashAnnotationName]) + logstashContainer := GetLogstashContainer(pod.Spec) + require.NotNil(t, logstashContainer) + assert.Equal(t, 2, len(logstashContainer.VolumeMounts)) + assert.Equal(t, container.ImageRepository(container.LogstashImage, "8.6.1"), logstashContainer.Image) + assert.NotNil(t, logstashContainer.ReadinessProbe) + assert.NotEmpty(t, logstashContainer.Ports) + }, + }, + { + name: "with custom image", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + Image: "my-custom-image:1.0.0", + Version: "8.6.1", + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, "my-custom-image:1.0.0", GetLogstashContainer(pod.Spec).Image) + }, + }, + { + name: "with default resources", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, DefaultResources, GetLogstashContainer(pod.Spec).Resources) + }, + }, + { + name: "with user-provided resources", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "logstash", + Resources: corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("3Gi"), + }, + }, + }, + }, + }, + }, + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceMemory: resource.MustParse("3Gi"), + }, + }, GetLogstashContainer(pod.Spec).Resources) + }, + }, + { + name: "with user-provided init containers", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + InitContainers: []corev1.Container{ + { + Name: "user-init-container", + }, + }, + }, + }, + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Len(t, pod.Spec.InitContainers, 1) + assert.Equal(t, pod.Spec.Containers[0].Image, pod.Spec.InitContainers[0].Image) + }, + }, + { + name: "with user-provided labels", + logstash: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash-name", + }, + Spec: logstashv1alpha1.LogstashSpec{ + PodTemplate: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + NameLabelName: "overridden-logstash-name", + }, + }, + }, + Version: "8.6.1", + }}, + assertions: func(pod corev1.PodTemplateSpec) { + labels := (&logstashv1alpha1.Logstash{ObjectMeta: metav1.ObjectMeta{Name: "logstash-name"}}).GetIdentityLabels() + labels[VersionLabelName] = "8.6.1" + labels["label1"] = "value1" + labels["label2"] = "value2" + labels[NameLabelName] = "overridden-logstash-name" + assert.Equal(t, labels, pod.Labels) + }, + }, + { + name: "with user-provided ENV variable", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "logstash", + Env: []corev1.EnvVar{ + { + Name: "user-env", + Value: "user-env-value", + }, + }, + }, + }, + }, + }, + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Len(t, GetLogstashContainer(pod.Spec).Env, 1) + }, + }, + { + name: "with multiple services, readiness probe hits the correct port", + logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Services: []logstashv1alpha1.LogstashService{{ + Name: LogstashAPIServiceName, + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "api", Protocol: "TCP", Port: 9200}, + }, + }, + }}, { + Name: "notapi", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "notapi", Protocol: "TCP", Port: 9600}, + }, + }, + }}, + }, + }, + }, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, 9200, GetLogstashContainer(pod.Spec).ReadinessProbe.HTTPGet.Port.IntValue()) + }, + }, + { + name: "with api service customized, readiness probe hits the correct port", + logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + Services: []logstashv1alpha1.LogstashService{ + { + Name: LogstashAPIServiceName, + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "api", Protocol: "TCP", Port: 9200}, + }, + }, + }}, + }, + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, 9200, GetLogstashContainer(pod.Spec).ReadinessProbe.HTTPGet.Port.IntValue()) + }, + }, + { + name: "with default service, readiness probe hits the correct port", + logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + Version: "8.6.1", + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, 9600, GetLogstashContainer(pod.Spec).ReadinessProbe.HTTPGet.Port.IntValue()) + }, + }, + + { + name: "with custom annotation", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + Image: "my-custom-image:1.0.0", + Version: "8.6.1", + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Equal(t, "my-custom-image:1.0.0", GetLogstashContainer(pod.Spec).Image) + }, + }, + { + name: "with user-provided volumes and volume mounts", + logstash: logstashv1alpha1.Logstash{Spec: logstashv1alpha1.LogstashSpec{ + PodTemplate: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "logstash", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "user-volume-mount", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "user-volume", + }, + }, + }, + }, + }}, + assertions: func(pod corev1.PodTemplateSpec) { + assert.Len(t, pod.Spec.Volumes, 3) + assert.Len(t, GetLogstashContainer(pod.Spec).VolumeMounts, 3) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params := Params{ + Context: context.Background(), + Client: k8s.NewFakeClient(), + Logstash: tt.logstash, + } + configHash := fnv.New32a() + got := buildPodTemplate(params, configHash) + tt.assertions(got) + }) + } +} + +// GetLogstashContainer returns the Logstash container from the given podSpec. +func GetLogstashContainer(podSpec corev1.PodSpec) *corev1.Container { + return pod.ContainerByName(podSpec, logstashv1alpha1.LogstashContainerName) +} diff --git a/pkg/controller/logstash/service.go b/pkg/controller/logstash/service.go index 7348e7970e..9f1bea3879 100644 --- a/pkg/controller/logstash/service.go +++ b/pkg/controller/logstash/service.go @@ -14,6 +14,10 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/network" ) +const ( + LogstashAPIServiceName = "api" +) + // reconcileServices reconcile Services defined in spec // // When a service is defined that matches the API service name, then that service is used to define @@ -23,14 +27,11 @@ func reconcileServices(params Params) ([]corev1.Service, error) { svcs := make([]corev1.Service, 0, len(params.Logstash.Spec.Services)+1) for _, service := range params.Logstash.Spec.Services { - var svc *corev1.Service logstash := params.Logstash if logstashv1alpha1.UserServiceName(logstash.Name, service.Name) == logstashv1alpha1.APIServiceName(logstash.Name) { - svc = newAPIService(params.Logstash) createdAPIService = true - } else { - svc = newService(service, params.Logstash) } + svc := newService(service, params.Logstash) if err := reconcileService(params, svc); err != nil { return []corev1.Service{}, err } @@ -87,10 +88,10 @@ func newAPIService(logstash logstashv1alpha1.Logstash) *corev1.Service { labels := NewLabels(logstash) ports := []corev1.ServicePort{ { - Name: "metrics", + Name: LogstashAPIServiceName, Protocol: corev1.ProtocolTCP, Port: network.HTTPPort, }, } return defaults.SetServiceDefaults(&svc, labels, labels, ports) -} +} \ No newline at end of file diff --git a/pkg/controller/logstash/service_test.go b/pkg/controller/logstash/service_test.go new file mode 100644 index 0000000000..90123b0dc5 --- /dev/null +++ b/pkg/controller/logstash/service_test.go @@ -0,0 +1,216 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/comparison" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func TestReconcileServices(t *testing.T) { + trueVal := true + testCases := []struct { + name string + logstash logstashv1alpha1.Logstash + wantSvc []corev1.Service + }{ + { + name: "default service", + logstash: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash", + Namespace: "test", + }, + }, + wantSvc: []corev1.Service{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash-ls-api", + Namespace: "test", + Labels: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "logstash.k8s.elastic.co/v1alpha1", + Kind: "Logstash", + Name: "logstash", + Controller: &trueVal, + BlockOwnerDeletion: &trueVal, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + ClusterIP: "None", + Ports: []corev1.ServicePort{ + {Name: "api", Protocol: "TCP", Port: 9600}, + }, + }, + }}, + }, + { + name: "Changed port on default service", + logstash: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash", + Namespace: "test", + }, + Spec: logstashv1alpha1.LogstashSpec{ + Services: []logstashv1alpha1.LogstashService{{ + Name: LogstashAPIServiceName, + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: LogstashAPIServiceName, Protocol: "TCP", Port: 9200}, + }, + }, + }, + }}, + }, + }, + wantSvc: []corev1.Service{{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash-ls-api", + Namespace: "test", + Labels: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "logstash.k8s.elastic.co/v1alpha1", + Kind: "Logstash", + Name: "logstash", + Controller: &trueVal, + BlockOwnerDeletion: &trueVal, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + ClusterIP: "", + Ports: []corev1.ServicePort{ + {Name: "api", Protocol: "TCP", Port: 9200}, + }, + }, + }}, + }, + { + name: "Default service plus one", + logstash: logstashv1alpha1.Logstash{ + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash", + Namespace: "test", + }, + Spec: logstashv1alpha1.LogstashSpec{ + Services: []logstashv1alpha1.LogstashService{{ + Name: "test", + Service: commonv1.ServiceTemplate{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Protocol: "TCP", Port: 9500}, + }, + }, + }, + }}, + }, + }, + wantSvc: []corev1.Service{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash-ls-test", + Namespace: "test", + Labels: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "logstash.k8s.elastic.co/v1alpha1", + Kind: "Logstash", + Name: "logstash", + Controller: &trueVal, + BlockOwnerDeletion: &trueVal, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + Ports: []corev1.ServicePort{ + {Protocol: "TCP", Port: 9500}, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "logstash-ls-api", + Namespace: "test", + Labels: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "logstash.k8s.elastic.co/v1alpha1", + Kind: "Logstash", + Name: "logstash", + Controller: &trueVal, + BlockOwnerDeletion: &trueVal, + }, + }, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "common.k8s.elastic.co/type": "logstash", + "logstash.k8s.elastic.co/name": "logstash", + }, + ClusterIP: "None", + Ports: []corev1.ServicePort{ + {Name: "api", Protocol: "TCP", Port: 9600}, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := k8s.NewFakeClient() + params := Params{ + Context: context.Background(), + Client: client, + Logstash: tc.logstash, + } + haveSvc, err := reconcileServices(params) + require.NoError(t, err) + require.Equal(t, len(tc.wantSvc), len(haveSvc)) + + for i := range tc.wantSvc { + comparison.AssertEqual(t, &tc.wantSvc[i], &haveSvc[i]) + } + }) + } +} diff --git a/test/e2e/test/logstash/http_client.go b/test/e2e/test/logstash/http_client.go index b9456d0153..c5b271c242 100644 --- a/test/e2e/test/logstash/http_client.go +++ b/test/e2e/test/logstash/http_client.go @@ -13,6 +13,8 @@ import ( "net/url" "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + ls "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/network" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" ) @@ -31,9 +33,16 @@ func NewLogstashClient(logstash v1alpha1.Logstash, k *test.K8sClient) (*http.Cli } func DoRequest(client *http.Client, logstash v1alpha1.Logstash, method, path string) ([]byte, error) { - scheme := "http" + var scheme = "http" + var port = network.HTTPPort + for _, service := range logstash.Spec.Services { + if service.Name == ls.LogstashAPIServiceName && len(service.Service.Spec.Ports) > 0 { + port = int(service.Service.Spec.Ports[0].Port) + } + } + + url, err := url.Parse(fmt.Sprintf("%s://%s.%s.svc:%d%s", scheme, v1alpha1.APIServiceName(logstash.Name), logstash.Namespace, port, path)) - url, err := url.Parse(fmt.Sprintf("%s://%s.%s.svc:9600%s", scheme, v1alpha1.APIServiceName(logstash.Name), logstash.Namespace, path)) if err != nil { return nil, fmt.Errorf("while parsing URL: %w", err) }