diff --git a/config/crds/v1/all-crds.yaml b/config/crds/v1/all-crds.yaml index 4f30dd94d3..ffa0083c17 100644 --- a/config/crds/v1/all-crds.yaml +++ b/config/crds/v1/all-crds.yaml @@ -9223,6 +9223,23 @@ spec: type: array type: object type: object + pipelines: + description: Pipelines holds the Logstash Pipelines. At most one of + [`Pipelines`, `PipelinesRef`] can be specified. + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true + pipelinesRef: + description: PipelinesRef contains a reference to an existing Kubernetes + Secret holding the Logstash Pipelines. Logstash pipelines must be + specified as yaml, under a single "pipelines.yml" entry. At most + one of [`Pipelines`, `PipelinesRef`] can be specified. + properties: + secretName: + description: SecretName is the name of the secret. + type: string + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. diff --git a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml index 5d330d10e2..f92a4a038c 100644 --- a/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml +++ b/config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml @@ -179,6 +179,23 @@ spec: type: array type: object type: object + pipelines: + description: Pipelines holds the Logstash Pipelines. At most one of + [`Pipelines`, `PipelinesRef`] can be specified. + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true + pipelinesRef: + description: PipelinesRef contains a reference to an existing Kubernetes + Secret holding the Logstash Pipelines. Logstash pipelines must be + specified as yaml, under a single "pipelines.yml" entry. At most + one of [`Pipelines`, `PipelinesRef`] can be specified. + properties: + secretName: + description: SecretName is the name of the secret. + type: string + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. diff --git a/config/samples/logstash/logstash.yaml b/config/samples/logstash/logstash.yaml index 850dd41d9c..fa49c576de 100644 --- a/config/samples/logstash/logstash.yaml +++ b/config/samples/logstash/logstash.yaml @@ -9,6 +9,9 @@ spec: log.level: info api.http.host: "0.0.0.0" queue.type: memory + pipelines: + - pipeline.id: main + config.string: input { exec { command => 'uptime' interval => 10 } } output { stdout{} } podTemplate: spec: containers: diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 51bcda4259..2870ef1880 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -226,7 +226,6 @@ webhooks: - logstashes sideEffects: None - admissionReviewVersions: - - v1alpha1 - v1 - v1beta1 clientConfig: diff --git a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml index 74932dc3e2..06f46ea7a2 100644 --- a/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml +++ b/deploy/eck-operator/charts/eck-operator-crds/templates/all-crds.yaml @@ -9277,6 +9277,23 @@ spec: type: array type: object type: object + pipelines: + description: Pipelines holds the Logstash Pipelines. At most one of + [`Pipelines`, `PipelinesRef`] can be specified. + items: + type: object + type: array + x-kubernetes-preserve-unknown-fields: true + pipelinesRef: + description: PipelinesRef contains a reference to an existing Kubernetes + Secret holding the Logstash Pipelines. Logstash pipelines must be + specified as yaml, under a single "pipelines.yml" entry. At most + one of [`Pipelines`, `PipelinesRef`] can be specified. + properties: + secretName: + description: SecretName is the name of the secret. + type: string + type: object podTemplate: description: PodTemplate provides customisation options for the Logstash pods. diff --git a/docs/reference/api-docs.asciidoc b/docs/reference/api-docs.asciidoc index df6bf48238..040a1e3058 100644 --- a/docs/reference/api-docs.asciidoc +++ b/docs/reference/api-docs.asciidoc @@ -1897,6 +1897,8 @@ LogstashSpec defines the desired state of Logstash | *`image`* __string__ | Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image. | *`config`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$]__ | Config holds the Logstash configuration. At most one of [`Config`, `ConfigRef`] can be specified. | *`configRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | ConfigRef contains a reference to an existing Kubernetes Secret holding the Logstash configuration. Logstash settings must be specified as yaml, under a single "logstash.yml" entry. At most one of [`Config`, `ConfigRef`] can be specified. +| *`pipelines`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$] array__ | Pipelines holds the Logstash Pipelines. At most one of [`Pipelines`, `PipelinesRef`] can be specified. +| *`pipelinesRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | PipelinesRef contains a reference to an existing Kubernetes Secret holding the Logstash Pipelines. Logstash pipelines must be specified as yaml, under a single "pipelines.yml" entry. At most one of [`Pipelines`, `PipelinesRef`] can be specified. | *`services`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstashservice[$$LogstashService$$] array__ | Services contains details of services that Logstash should expose - similar to the HTTP layer configuration for the rest of the stack, but also applicable for more use cases than the metrics API, as logstash may need to be opened up for other services: beats, TCP, UDP, etc, inputs | *`monitoring`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-monitoring[$$Monitoring$$]__ | Monitoring enables you to collect and ship log and monitoring data of this Logstash. Metricbeat and Filebeat are deployed in the same Pod as sidecars and each one sends data to one or two different Elasticsearch monitoring clusters running in the same Kubernetes cluster. | *`podTemplate`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#podtemplatespec-v1-core[$$PodTemplateSpec$$]__ | PodTemplate provides customisation options for the Logstash pods. diff --git a/pkg/apis/logstash/v1alpha1/logstash_types.go b/pkg/apis/logstash/v1alpha1/logstash_types.go index 843cfae5df..bf595c3c3a 100644 --- a/pkg/apis/logstash/v1alpha1/logstash_types.go +++ b/pkg/apis/logstash/v1alpha1/logstash_types.go @@ -42,6 +42,17 @@ type LogstashSpec struct { // +kubebuilder:validation:Optional ConfigRef *commonv1.ConfigSource `json:"configRef,omitempty"` + // Pipelines holds the Logstash Pipelines. At most one of [`Pipelines`, `PipelinesRef`] can be specified. + // +kubebuilder:validation:Optional + // +kubebuilder:pruning:PreserveUnknownFields + Pipelines []commonv1.Config `json:"pipelines,omitempty"` + + // PipelinesRef contains a reference to an existing Kubernetes Secret holding the Logstash Pipelines. + // Logstash pipelines must be specified as yaml, under a single "pipelines.yml" entry. At most one of [`Pipelines`, `PipelinesRef`] + // can be specified. + // +kubebuilder:validation:Optional + PipelinesRef *commonv1.ConfigSource `json:"pipelinesRef,omitempty"` + // Services contains details of services that Logstash should expose - similar to the HTTP layer configuration for the // rest of the stack, but also applicable for more use cases than the metrics API, as logstash may need to // be opened up for other services: beats, TCP, UDP, etc, inputs diff --git a/pkg/apis/logstash/v1alpha1/name.go b/pkg/apis/logstash/v1alpha1/name.go index 88c56cd9ea..8b93b2cf3a 100644 --- a/pkg/apis/logstash/v1alpha1/name.go +++ b/pkg/apis/logstash/v1alpha1/name.go @@ -11,6 +11,7 @@ import ( const ( apiServiceSuffix = "api" configSuffix = "config" + pipelineSuffix = "pipeline" ) // Namer is a Namer that is configured with the defaults for resources related to a Logstash resource. @@ -34,3 +35,7 @@ func APIServiceName(name string) string { func UserServiceName(deployName string, name string) string { return Namer.Suffix(deployName, name) } + +func PipelineSecretName(name string) string { + return Namer.Suffix(name, pipelineSuffix) +} diff --git a/pkg/apis/logstash/v1alpha1/validations.go b/pkg/apis/logstash/v1alpha1/validations.go index 5cddf8ca6e..19d9620500 100644 --- a/pkg/apis/logstash/v1alpha1/validations.go +++ b/pkg/apis/logstash/v1alpha1/validations.go @@ -25,6 +25,7 @@ var ( checkSingleConfigSource, checkMonitoring, checkAssociations, + checkSinglePipelineSource, } updateChecks = []func(old, curr *Logstash) field.ErrorList{ @@ -73,3 +74,15 @@ func checkAssociations(l *Logstash) field.ErrorList { err2 := commonv1.CheckAssociationRefs(monitoringPath.Child("logs"), l.GetMonitoringLogsRefs()...) return append(err1, err2...) } + +func checkSinglePipelineSource(a *Logstash) field.ErrorList { + if a.Spec.Pipelines != nil && a.Spec.PipelinesRef != nil { + msg := "Specify at most one of [`pipelines`, `pipelinesRef`], not both" + return field.ErrorList{ + field.Forbidden(field.NewPath("spec").Child("pipelines"), msg), + field.Forbidden(field.NewPath("spec").Child("pipelinesRef"), msg), + } + } + + return nil +} diff --git a/pkg/apis/logstash/v1alpha1/validations_test.go b/pkg/apis/logstash/v1alpha1/validations_test.go index 08cd574aa4..d6ecd75d5a 100644 --- a/pkg/apis/logstash/v1alpha1/validations_test.go +++ b/pkg/apis/logstash/v1alpha1/validations_test.go @@ -145,6 +145,57 @@ func Test_checkSingleConfigSource(t *testing.T) { } } +func Test_checkSinglePipelineSource(t *testing.T) { + tests := []struct { + name string + logstash Logstash + wantErr bool + }{ + { + name: "pipelinesRef absent, pipelines present", + logstash: Logstash{ + Spec: LogstashSpec{ + Pipelines: []commonv1.Config{}, + }, + }, + wantErr: false, + }, + { + name: "pipelines absent, pipelinesRef present", + logstash: Logstash{ + Spec: LogstashSpec{ + PipelinesRef: &commonv1.ConfigSource{}, + }, + }, + wantErr: false, + }, + { + name: "neither present", + logstash: Logstash{ + Spec: LogstashSpec{}, + }, + wantErr: false, + }, + { + name: "both present", + logstash: Logstash{ + Spec: LogstashSpec{ + Pipelines: []commonv1.Config{}, + PipelinesRef: &commonv1.ConfigSource{}, + }, + }, + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := checkSinglePipelineSource(&tc.logstash) + assert.Equal(t, tc.wantErr, len(got) > 0) + }) + } +} + func Test_checkSupportedVersion(t *testing.T) { for _, tt := range []struct { name string diff --git a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go index 0d921a9c67..7cbc8dd427 100644 --- a/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go @@ -130,6 +130,18 @@ func (in *LogstashSpec) DeepCopyInto(out *LogstashSpec) { *out = new(v1.ConfigSource) **out = **in } + if in.Pipelines != nil { + in, out := &in.Pipelines, &out.Pipelines + *out = make([]v1.Config, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.PipelinesRef != nil { + in, out := &in.PipelinesRef, &out.PipelinesRef + *out = new(v1.ConfigSource) + **out = **in + } if in.Services != nil { in, out := &in.Services, &out.Services *out = make([]LogstashService, len(*in)) diff --git a/pkg/controller/common/configref.go b/pkg/controller/common/configref.go index 0f100206b8..89961372b8 100644 --- a/pkg/controller/common/configref.go +++ b/pkg/controller/common/configref.go @@ -8,6 +8,9 @@ import ( "context" "fmt" + "github.com/elastic/go-ucfg" + uyaml "github.com/elastic/go-ucfg/yaml" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" @@ -34,6 +37,23 @@ func ParseConfigRef( configRef *commonv1.ConfigSource, secretKey string, // retrieve config data from that entry in the secret ) (*settings.CanonicalConfig, error) { + parsed, err := ParseConfigRefToConfig(driver, resource, configRef, secretKey, ConfigRefWatchName, settings.Options) + if err != nil { + return nil, err + } + return (*settings.CanonicalConfig)(parsed), nil +} + +// ParseConfigRefToConfig retrieves the content of a secret referenced in `configRef`, sets up dynamic watches for that secret, +// and parses the secret content into ucfg.Config. +func ParseConfigRefToConfig( + driver driver.Interface, + resource runtime.Object, // eg. Beat, EnterpriseSearch + configRef *commonv1.ConfigSource, + secretKey string, // retrieve config data from that entry in the secret + configRefWatchName func(types.NamespacedName) string, + configOptions []ucfg.Option, +) (*ucfg.Config, error) { resourceMeta, err := meta.Accessor(resource) if err != nil { return nil, err @@ -46,7 +66,7 @@ func ParseConfigRef( if configRef != nil && configRef.SecretName != "" { secretNames = append(secretNames, configRef.SecretName) } - if err := watches.WatchUserProvidedSecrets(resourceNsn, driver.DynamicWatches(), ConfigRefWatchName(resourceNsn), secretNames); err != nil { + if err := watches.WatchUserProvidedSecrets(resourceNsn, driver.DynamicWatches(), configRefWatchName(resourceNsn), secretNames); err != nil { return nil, err } @@ -66,7 +86,9 @@ func ParseConfigRef( driver.Recorder().Event(resource, corev1.EventTypeWarning, events.EventReasonUnexpected, msg) return nil, errors.New(msg) } - parsed, err := settings.ParseConfig(data) + + parsed, err := uyaml.NewConfig(data, configOptions...) + if err != nil { msg := fmt.Sprintf("unable to parse %s in configRef secret %s/%s", secretKey, namespace, configRef.SecretName) driver.Recorder().Event(resource, corev1.EventTypeWarning, events.EventReasonUnexpected, msg) diff --git a/pkg/controller/logstash/driver.go b/pkg/controller/logstash/driver.go index 9fec5b89e4..afc12e7d67 100644 --- a/pkg/controller/logstash/driver.go +++ b/pkg/controller/logstash/driver.go @@ -88,6 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log return results.WithError(err), params.Status } + if err := reconcilePipeline(params, configHash); err != nil { + return results.WithError(err), params.Status + } + podTemplate := buildPodTemplate(params, configHash) return reconcileStatefulSet(params, podTemplate) } diff --git a/pkg/controller/logstash/logstash_controller.go b/pkg/controller/logstash/logstash_controller.go index 2eea84378d..6c71683b51 100644 --- a/pkg/controller/logstash/logstash_controller.go +++ b/pkg/controller/logstash/logstash_controller.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" logconf "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log" ) @@ -196,5 +197,6 @@ func (r *ReconcileLogstash) validate(ctx context.Context, logstash logstashv1alp func (r *ReconcileLogstash) onDelete(ctx context.Context, obj types.NamespacedName) error { r.dynamicWatches.Secrets.RemoveHandlerForKey(keystore.SecureSettingsWatchName(obj)) r.dynamicWatches.Secrets.RemoveHandlerForKey(common.ConfigRefWatchName(obj)) + r.dynamicWatches.Secrets.RemoveHandlerForKey(pipelines.RefWatchName(obj)) return reconciler.GarbageCollectSoftOwnedSecrets(ctx, r.Client, obj, logstashv1alpha1.Kind) } diff --git a/pkg/controller/logstash/pipeline.go b/pkg/controller/logstash/pipeline.go new file mode 100644 index 0000000000..ddc696b29d --- /dev/null +++ b/pkg/controller/logstash/pipeline.go @@ -0,0 +1,83 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "hash" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" +) + +func reconcilePipeline(params Params, configHash hash.Hash) error { + defer tracing.Span(¶ms.Context)() + + cfgBytes, err := buildPipeline(params) + if err != nil { + return err + } + + expected := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: params.Logstash.Namespace, + Name: logstashv1alpha1.PipelineSecretName(params.Logstash.Name), + Labels: labels.AddCredentialsLabel(NewLabels(params.Logstash)), + }, + Data: map[string][]byte{ + PipelineFileName: cfgBytes, + }, + } + + if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, ¶ms.Logstash); err != nil { + return err + } + + _, _ = configHash.Write(cfgBytes) + + return nil +} + +func buildPipeline(params Params) ([]byte, error) { + userProvidedCfg, err := getUserPipeline(params) + if err != nil { + return nil, err + } + + if userProvidedCfg != nil { + return userProvidedCfg.Render() + } + + cfg := defaultPipeline + return cfg.Render() +} + +// getUserPipeline extracts the pipeline either from the spec `pipeline` field or from the Secret referenced by spec +// `pipelineRef` field. +func getUserPipeline(params Params) (*pipelines.Config, error) { + if params.Logstash.Spec.Pipelines != nil { + pipes := make([]map[string]interface{}, 0, len(params.Logstash.Spec.Pipelines)) + for _, p := range params.Logstash.Spec.Pipelines { + pipes = append(pipes, p.Data) + } + + return pipelines.FromSpec(pipes) + } + return pipelines.ParsePipelinesRef(params, ¶ms.Logstash, params.Logstash.Spec.PipelinesRef, PipelineFileName) +} + +var ( + defaultPipeline = pipelines.MustFromSpec([]map[string]string{ + { + "pipeline.id": "main", + "path.config": "/usr/share/logstash/pipeline", + }, + }) +) diff --git a/pkg/controller/logstash/pipeline_test.go b/pkg/controller/logstash/pipeline_test.go new file mode 100644 index 0000000000..59d28a5bf2 --- /dev/null +++ b/pkg/controller/logstash/pipeline_test.go @@ -0,0 +1,124 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package logstash + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +func Test_buildPipeline(t *testing.T) { + for _, tt := range []struct { + name string + pipelines []commonv1.Config + pipelinesRef *commonv1.ConfigSource + client k8s.Client + want *pipelines.Config + wantErr bool + }{ + { + name: "no user pipeline", + want: defaultPipeline, + }, + { + name: "pipeline populated", + pipelines: []commonv1.Config{ + {Data: map[string]interface{}{"pipeline.id": "main"}}, + }, + want: pipelines.MustParse([]byte(`- "pipeline.id": "main"`)), + }, + { + name: "pipelinesref populated - no secret", + pipelinesRef: &commonv1.ConfigSource{ + SecretRef: commonv1.SecretRef{ + SecretName: "my-secret-pipeline", + }, + }, + client: k8s.NewFakeClient(), + want: pipelines.EmptyConfig(), + wantErr: true, + }, + { + name: "pipelinesref populated - no secret key", + pipelinesRef: &commonv1.ConfigSource{ + SecretRef: commonv1.SecretRef{ + SecretName: "my-secret-pipeline", + }, + }, + client: k8s.NewFakeClient(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-secret-pipeline", + }, + }), + want: pipelines.EmptyConfig(), + wantErr: true, + }, + { + name: "pipelinesref populated - malformed config", + pipelinesRef: &commonv1.ConfigSource{ + SecretRef: commonv1.SecretRef{ + SecretName: "my-secret-pipeline-2", + }, + }, + client: k8s.NewFakeClient(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-secret-pipeline-2", + }, + Data: map[string][]byte{"pipelines.yml": []byte("something:bad:value")}, + }), + want: pipelines.EmptyConfig(), + wantErr: true, + }, + { + name: "pipelinesref populated", + pipelinesRef: &commonv1.ConfigSource{ + SecretRef: commonv1.SecretRef{ + SecretName: "my-secret-pipeline-2", + }, + }, + client: k8s.NewFakeClient(&corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-secret-pipeline-2", + }, + Data: map[string][]byte{"pipelines.yml": []byte(`- "pipeline.id": "main"`)}, + }), + want: pipelines.MustParse([]byte(`- "pipeline.id": "main"`)), + }, + } { + t.Run(tt.name, func(t *testing.T) { + params := Params{ + Context: context.Background(), + Client: tt.client, + EventRecorder: &record.FakeRecorder{}, + Watches: watches.NewDynamicWatches(), + Logstash: logstashv1alpha1.Logstash{ + Spec: logstashv1alpha1.LogstashSpec{ + Pipelines: tt.pipelines, + PipelinesRef: tt.pipelinesRef, + }, + }, + } + + gotYaml, gotErr := buildPipeline(params) + diff, err := tt.want.Diff(pipelines.MustParse(gotYaml)) + if diff { + t.Errorf("buildPipeline() got unexpected differences: %v", err) + } + + require.Equal(t, tt.wantErr, gotErr != nil) + }) + } +} diff --git a/pkg/controller/logstash/pipelines/config.go b/pkg/controller/logstash/pipelines/config.go new file mode 100644 index 0000000000..f92de18aa5 --- /dev/null +++ b/pkg/controller/logstash/pipelines/config.go @@ -0,0 +1,135 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pipelines + +import ( + "fmt" + "reflect" + + "github.com/elastic/go-ucfg" + uyaml "github.com/elastic/go-ucfg/yaml" + "gopkg.in/yaml.v3" +) + +// Config contains configuration for Logstash pipeline ("pipelines.yml"), +// `.` in between the key, pipeline.id, is treated as string +// pipelines.yml is expected an array of pipeline definition. +type Config ucfg.Config + +// Options are config options for the YAML file. +var Options = []ucfg.Option{ucfg.AppendValues} + +// EmptyConfig creates a new empty config. +func EmptyConfig() *Config { + return fromConfig(ucfg.New()) +} + +// FromSpec creates a new pipeline from spec. +func FromSpec(cfg interface{}) (*Config, error) { + config, err := ucfg.NewFrom(cfg, Options...) + if err != nil { + return nil, err + } + return fromConfig(config), nil +} + +// MustFromSpec creates a new pipeline and panics on errors. +// Use for testing only. +func MustFromSpec(cfg interface{}) *Config { + config, err := FromSpec(cfg) + if err != nil { + panic(err) + } + return config +} + +// Parse parses the given pipeline content into a PipelinesConfig. +// Expects content to be in YAML format. +func Parse(yml []byte) (*Config, error) { + config, err := uyaml.NewConfig(yml, Options...) + if err != nil { + return nil, err + } + return fromConfig(config), nil +} + +// MustParse parses the given pipeline content into a Pipelines. +// Expects content to be in YAML format. Panics on error. +// Use for testing only. +func MustParse(yml []byte) *Config { + config, err := uyaml.NewConfig(yml, Options...) + if err != nil { + panic(err) + } + return fromConfig(config) +} + +// Render returns the content of the configuration file, +// with fields sorted alphabetically. +func (c *Config) Render() ([]byte, error) { + if c == nil { + return []byte{}, nil + } + var out []interface{} + if err := c.asUCfg().Unpack(&out); err != nil { + return []byte{}, err + } + return yaml.Marshal(out) +} + +func (c *Config) asUCfg() *ucfg.Config { + return (*ucfg.Config)(c) +} + +func fromConfig(in *ucfg.Config) *Config { + return (*Config)(in) +} + +// Diff returns true if the key/value or the sequence of two PipelinesConfig are different. +// Use for testing only. +func (c *Config) Diff(c2 *Config) (bool, error) { + if c == c2 { + return false, nil + } + if c == nil && c2 != nil { + return true, fmt.Errorf("empty lhs config %s", c2.asUCfg().FlattenedKeys(Options...)) + } + if c != nil && c2 == nil { + return true, fmt.Errorf("empty rhs config %s", c.asUCfg().FlattenedKeys(Options...)) + } + + var s []map[string]interface{} + var s2 []map[string]interface{} + err := c.asUCfg().Unpack(&s, Options...) + if err != nil { + return true, err + } + err = c2.asUCfg().Unpack(&s2, Options...) + if err != nil { + return true, err + } + + return diffSlice(s, s2) +} + +// diffSlice returns true if the key/value or the sequence of two PipelinesConfig are different. +func diffSlice(s1, s2 []map[string]interface{}) (bool, error) { + if len(s1) != len(s2) { + return true, fmt.Errorf("array size doesn't match %d, %d", len(s1), len(s2)) + } + var diff []string + for i, m := range s1 { + m2 := s2[i] + if eq := reflect.DeepEqual(m, m2); !eq { + diff = append(diff, fmt.Sprintf("%s vs %s, ", m, m2)) + } + } + + if len(diff) > 0 { + return true, fmt.Errorf("there are %d differences. %s", len(diff), diff) + } + + return false, nil +} diff --git a/pkg/controller/logstash/pipelines/config_test.go b/pkg/controller/logstash/pipelines/config_test.go new file mode 100644 index 0000000000..43c16835bb --- /dev/null +++ b/pkg/controller/logstash/pipelines/config_test.go @@ -0,0 +1,287 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pipelines + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPipelinesConfig_Render(t *testing.T) { + config := MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "demo", + "config.string": "input { exec { command => \"uptime\" interval => 5 } } output { stdout{} }", + }, + { + "pipeline.id": "standard", + "pipeline.workers": 1, + "queue.type": "persisted", + "queue.drain": true, + "dead_letter_queue.max_bytes": "1024mb", + "path.config": "/tmp/logstash/*.config", + }, + }, + ) + output, err := config.Render() + require.NoError(t, err) + expected := []byte(`- config.string: input { exec { command => "uptime" interval => 5 } } output { stdout{} } + pipeline.id: demo +- dead_letter_queue.max_bytes: 1024mb + path.config: /tmp/logstash/*.config + pipeline.id: standard + pipeline.workers: 1 + queue.drain: true + queue.type: persisted +`) + require.Equal(t, string(expected), string(output)) +} + +func TestParsePipelinesConfig(t *testing.T) { + tests := []struct { + name string + input string + want *Config + wantErr bool + }{ + { + name: "no input", + input: "", + want: EmptyConfig(), + wantErr: false, + }, + { + name: "simple input", + input: "- pipeline.id: demo\n config.string: input { exec { command => \"${ENV}\" interval => 5 } }", + want: MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "demo", + "config.string": "input { exec { command => \"${ENV}\" interval => 5 } }", + }, + }, + ), + wantErr: false, + }, + { + name: "number input", + input: "- pipeline.id: main\n pipeline.workers: 4", + want: MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "main", + "pipeline.workers": 4, + }, + }, + ), + wantErr: false, + }, + { + name: "boolean input", + input: "- pipeline.id: main\n queue.drain: false", + want: MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "main", + "queue.drain": false, + }, + }, + ), + wantErr: false, + }, + { + name: "trim whitespaces between key and value", + input: "- pipeline.id : demo \n path.config : /tmp/logstash/*.config ", + want: MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "demo", + "path.config": "/tmp/logstash/*.config", + }, + }, + ), + wantErr: false, + }, + { + name: "tabs are invalid in YAML", + input: "\ta: b \n c:d ", + wantErr: true, + }, + { + name: "trim newlines", + input: "- pipeline.id: demo \n\n- pipeline.id: demo2 \n", + want: MustFromSpec( + []map[string]interface{}{ + {"pipeline.id": "demo"}, + {"pipeline.id": "demo2"}, + }, + ), + wantErr: false, + }, + { + name: "ignore comments", + input: "- pipeline.id: demo \n#this is a comment\n pipeline.workers: \"1\"\n", + want: MustFromSpec( + []map[string]interface{}{ + { + "pipeline.id": "demo", + "pipeline.workers": "1", + }, + }, + ), + wantErr: false, + }, + { + name: "support quotes", + input: `- "pipeline.id": "quote"`, + want: MustFromSpec( + []map[string]interface{}{ + {"pipeline.id": "quote"}, + }, + ), + wantErr: false, + }, + { + name: "support special characters", + input: `- config.string: "${node.ip}%.:=+è! /"`, + want: MustFromSpec( + []map[string]interface{}{ + {"config.string": `${node.ip}%.:=+è! /`}, + }, + ), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := Parse([]byte(tt.input)) + if (err != nil) != tt.wantErr { + t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got == tt.want { + return + } + + if diff, _ := got.Diff(tt.want); diff { + gotRendered, err := got.Render() + require.NoError(t, err) + wantRendered, err := tt.want.Render() + require.NoError(t, err) + t.Errorf("Parse(), want: %s, got: %s", wantRendered, gotRendered) + } + }) + } +} + +func TestPipelinesConfig_Diff(t *testing.T) { + tests := []struct { + name string + c *Config + c2 *Config + wantDiff bool + }{ + { + name: "nil diff", + c: nil, + c2: nil, + wantDiff: false, + }, + { + name: "lhs nil", + c: nil, + c2: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + {"b": "b"}, + }, + ), + wantDiff: true, + }, + { + name: "rhs nil", + c: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + }, + ), + c2: nil, + wantDiff: true, + }, + { + name: "same multi key value", + c: MustFromSpec( + []map[string]interface{}{ + {"a": "a", "b": "b", "c": 1, "d": true}, + }, + ), + c2: MustFromSpec( + []map[string]interface{}{ + {"c": 1, "b": "b", "a": "a", "d": true}, + }, + ), + wantDiff: false, + }, + { + name: "different value", + c: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + }, + ), + c2: MustFromSpec( + []map[string]interface{}{ + {"a": "b"}, + }, + ), + wantDiff: true, + }, + { + name: "array size different", + c: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + }, + ), + c2: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + {"a": "a"}, + }, + ), + wantDiff: true, + }, + { + name: "respects list order", + c: MustFromSpec( + []map[string]interface{}{ + {"a": "a"}, + {"b": "b"}, + }, + ), + c2: MustFromSpec( + []map[string]interface{}{ + {"b": "b"}, + {"a": "a"}, + }, + ), + wantDiff: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + diff, err := tt.c.Diff(tt.c2) + if (err != nil) != tt.wantDiff { + t.Errorf("Diff() got unexpected differences. wantDiff: %t, err: %v", tt.wantDiff, err) + return + } + + require.Equal(t, tt.wantDiff, diff) + }) + } +} diff --git a/pkg/controller/logstash/pipelines/ref.go b/pkg/controller/logstash/pipelines/ref.go new file mode 100644 index 0000000000..4cf753c19b --- /dev/null +++ b/pkg/controller/logstash/pipelines/ref.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pipelines + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/driver" +) + +// RefWatchName returns the name of the watch registered on the secret referenced in `pipelinesRef`. +func RefWatchName(resource types.NamespacedName) string { + return fmt.Sprintf("%s-%s-pipelinesref", resource.Namespace, resource.Name) +} + +// ParsePipelinesRef retrieves the content of a secret referenced in `pipelinesRef`, sets up dynamic watches for that secret, +// and parses the secret content into a PipelinesConfig. +func ParsePipelinesRef( + driver driver.Interface, + resource runtime.Object, + pipelinesRef *commonv1.ConfigSource, + secretKey string, // retrieve config data from that entry in the secret +) (*Config, error) { + parsed, err := common.ParseConfigRefToConfig(driver, resource, pipelinesRef, secretKey, RefWatchName, Options) + if err != nil { + return nil, err + } + + return (*Config)(parsed), nil +} diff --git a/pkg/controller/logstash/pipelines/ref_test.go b/pkg/controller/logstash/pipelines/ref_test.go new file mode 100644 index 0000000000..f71cdd1dae --- /dev/null +++ b/pkg/controller/logstash/pipelines/ref_test.go @@ -0,0 +1,191 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package pipelines + +import ( + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/driver" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches" + "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" +) + +type fakeDriver struct { + client k8s.Client + watches watches.DynamicWatches + recorder record.EventRecorder +} + +func (f fakeDriver) K8sClient() k8s.Client { + return f.client +} + +func (f fakeDriver) DynamicWatches() watches.DynamicWatches { + return f.watches +} + +func (f fakeDriver) Recorder() record.EventRecorder { + return f.recorder +} + +var _ driver.Interface = fakeDriver{} + +func TestParsePipelinesRef(t *testing.T) { + // any resource Kind would work here (eg. Beat, EnterpriseSearch, etc.) + resNsn := types.NamespacedName{Namespace: "ns", Name: "resource"} + res := corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: resNsn.Namespace, Name: resNsn.Name}} + watchName := RefWatchName(resNsn) + + tests := []struct { + name string + pipelinesRef *commonv1.ConfigSource + secretKey string + runtimeObjs []runtime.Object + want *Config + wantErr bool + existingWatches []string + wantWatches []string + wantEvent string + }{ + { + name: "happy path", + pipelinesRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: "my-secret"}}, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "configFile.yml": []byte(`- "pipeline.id": "main"`), + }}, + }, + want: MustParse([]byte(`- "pipeline.id": "main"`)), + wantWatches: []string{watchName}, + }, + { + name: "happy path, secret already watched", + pipelinesRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: "my-secret"}}, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "configFile.yml": []byte(`- "pipeline.id": "main"`), + }}, + }, + want: MustParse([]byte(`- "pipeline.id": "main"`)), + existingWatches: []string{watchName}, + wantWatches: []string{watchName}, + }, + { + name: "no pipelinesRef specified", + pipelinesRef: nil, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "configFile.yml": []byte(`- "pipeline.id": "main"`), + }}, + }, + want: nil, + wantWatches: []string{}, + }, + { + name: "no pipelinesRef specified: clear existing watches", + pipelinesRef: nil, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "configFile.yml": []byte(`- "pipeline.id": "main"`), + }}, + }, + want: nil, + existingWatches: []string{watchName}, + wantWatches: []string{}, + }, + { + name: "secret not found: error out but watch the future secret", + pipelinesRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: "my-secret"}}, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{}, + want: nil, + wantErr: true, + wantWatches: []string{watchName}, + }, + { + name: "missing key in the referenced secret: error out, watch the secret and emit an event", + pipelinesRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: "my-secret"}}, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "unexpected-key": []byte(`- "pipeline.id": "main"`), + }}, + }, + wantErr: true, + wantWatches: []string{watchName}, + wantEvent: "Warning Unexpected unable to parse configRef secret ns/my-secret: missing key configFile.yml", + }, + { + name: "invalid config the referenced secret: error out, watch the secret and emit an event", + pipelinesRef: &commonv1.ConfigSource{SecretRef: commonv1.SecretRef{SecretName: "my-secret"}}, + secretKey: "configFile.yml", + runtimeObjs: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "my-secret"}, + Data: map[string][]byte{ + "configFile.yml": []byte("this.is invalid config"), + }}, + }, + wantErr: true, + wantWatches: []string{watchName}, + wantEvent: "Warning Unexpected unable to parse configFile.yml in configRef secret ns/my-secret", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeRecorder := record.NewFakeRecorder(10) + w := watches.NewDynamicWatches() + for _, existingWatch := range tt.existingWatches { + require.NoError(t, w.Secrets.AddHandler(watches.NamedWatch{Name: existingWatch})) + } + d := fakeDriver{ + client: k8s.NewFakeClient(tt.runtimeObjs...), + watches: w, + recorder: fakeRecorder, + } + got, err := ParsePipelinesRef(d, &res, tt.pipelinesRef, tt.secretKey) + if (err != nil) != tt.wantErr { + t.Errorf("ParsePipelinesRef() error = %v, wantErr %v", err, tt.wantErr) + return + } + require.Equal(t, tt.want, got) + require.Equal(t, tt.wantWatches, d.watches.Secrets.Registrations()) + + if tt.wantEvent != "" { + require.Equal(t, tt.wantEvent, <-fakeRecorder.Events) + } else { + // no event expected + select { + case e := <-fakeRecorder.Events: + require.Fail(t, "no event expected but got one", "event", e) + default: + // ok + } + } + }) + } +} diff --git a/pkg/controller/logstash/pod.go b/pkg/controller/logstash/pod.go index 00ff46e823..af9c6d53f2 100644 --- a/pkg/controller/logstash/pod.go +++ b/pkg/controller/logstash/pod.go @@ -31,6 +31,9 @@ const ( LogstashConfigVolumeName = "logstash" LogstashConfigFileName = "logstash.yml" + PipelineVolumeName = "pipeline" + PipelineFileName = "pipelines.yml" + // ConfigHashAnnotationName is an annotation used to store the Logstash config hash. ConfigHashAnnotationName = "logstash.k8s.elastic.co/config-hash" @@ -63,6 +66,13 @@ func buildPodTemplate(params Params, configHash hash.Hash32) corev1.PodTemplateS path.Join(ConfigMountPath, LogstashConfigFileName), LogstashConfigFileName, 0644), + // volume with logstash pipeline file + volume.NewSecretVolume( + logstashv1alpha1.PipelineSecretName(params.Logstash.Name), + PipelineVolumeName, + path.Join(ConfigMountPath, PipelineFileName), + PipelineFileName, + 0644), } labels := maps.Merge(params.Logstash.GetIdentityLabels(), map[string]string{ diff --git a/test/e2e/logstash/logstash_test.go b/test/e2e/logstash/logstash_test.go index 573af6b5f6..7f56b411b2 100644 --- a/test/e2e/logstash/logstash_test.go +++ b/test/e2e/logstash/logstash_test.go @@ -9,11 +9,11 @@ package logstash import ( "testing" - corev1 "k8s.io/api/core/v1" commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/logstash" + corev1 "k8s.io/api/core/v1" ) func TestSingleLogstash(t *testing.T) { @@ -25,7 +25,7 @@ func TestSingleLogstash(t *testing.T) { func TestLogstashWithCustomService(t *testing.T) { name := "test-multiple-custom-logstash" - service := logstashv1alpha1.LogstashService { + service := logstashv1alpha1.LogstashService{ Name: "test", Service: commonv1.ServiceTemplate{ Spec: corev1.ServiceSpec{ @@ -44,7 +44,7 @@ func TestLogstashWithCustomService(t *testing.T) { func TestLogstashWithReworkedApiService(t *testing.T) { name := "test-multiple-custom-logstash" - service := logstashv1alpha1.LogstashService { + service := logstashv1alpha1.LogstashService{ Name: "api", Service: commonv1.ServiceTemplate{ Spec: corev1.ServiceSpec{ @@ -63,7 +63,7 @@ func TestLogstashWithReworkedApiService(t *testing.T) { func TestLogstashWithCustomServiceAndAmendedApi(t *testing.T) { name := "test-multiple-custom-logstash" - customService := logstashv1alpha1.LogstashService { + customService := logstashv1alpha1.LogstashService{ Name: "test", Service: commonv1.ServiceTemplate{ Spec: corev1.ServiceSpec{ @@ -74,7 +74,7 @@ func TestLogstashWithCustomServiceAndAmendedApi(t *testing.T) { }, } - apiService := logstashv1alpha1.LogstashService { + apiService := logstashv1alpha1.LogstashService{ Name: "api", Service: commonv1.ServiceTemplate{ Spec: corev1.ServiceSpec{ @@ -92,7 +92,6 @@ func TestLogstashWithCustomServiceAndAmendedApi(t *testing.T) { test.Sequence(nil, test.EmptySteps, logstashBuilder).RunSequential(t) } - func TestMultipleLogstashes(t *testing.T) { name := "test-multiple-logstashes" logstashBuilder := logstash.NewBuilder(name). diff --git a/test/e2e/logstash/pipeline_test.go b/test/e2e/logstash/pipeline_test.go new file mode 100644 index 0000000000..576dd11a77 --- /dev/null +++ b/test/e2e/logstash/pipeline_test.go @@ -0,0 +1,157 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build logstash || e2e + +package logstash + +import ( + corev1 "k8s.io/api/core/v1" + "testing" + + commonv1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" + "github.com/elastic/cloud-on-k8s/v2/test/e2e/test/logstash" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestPipelineConfigRefLogstash PipelineRef should be able to take pipelines.yaml from Secret. +func TestPipelineConfigRefLogstash(t *testing.T) { + secretName := "ls-generator-pipeline" + + pipelineSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: test.Ctx().ManagedNamespace(0), + }, + StringData: map[string]string{ + "pipelines.yml": ` +- pipeline.id: generator + pipeline.workers: 1 + queue.drain: false + config.string: input { generator {} } filter { sleep { time => 10 } } output { stdout { codec => dots } } +- pipeline.id: main + config.string: input { stdin{} } output { stdout{} }`, + }, + } + + before := test.StepsFunc(func(k *test.K8sClient) test.StepList { + return test.StepList{}.WithStep(test.Step{ + Name: "Create pipeline secret", + Test: test.Eventually(func() error { + return k.CreateOrUpdateSecrets(pipelineSecret) + }), + }) + }) + + name := "test-pipeline-ref" + b := logstash.NewBuilder(name). + WithNodeCount(1). + WithPipelinesConfigRef(commonv1.ConfigSource{ + SecretRef: commonv1.SecretRef{ + SecretName: secretName, + }, + }) + + steps := test.StepsFunc(func(k *test.K8sClient) test.StepList { + return test.StepList{ + b.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [generator]", + Path: "/_node/pipelines/generator", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.generator.workers": "1"}, + }), + test.Step{ + Name: "Delete pipeline secret", + Test: test.Eventually(func() error { + return k.DeleteSecrets(pipelineSecret) + }), + }, + } + }) + + test.Sequence(before, steps, b).RunSequential(t) +} + +// TestPipelineConfigLogstash Pipeline should be able to pass to Logstash via VolumeMount. +func TestPipelineConfigLogstash(t *testing.T) { + secretName := "ls-split-pipe" + + pipelineSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: test.Ctx().ManagedNamespace(0), + }, + StringData: map[string]string{ + "split.conf": "input { exec { command => \"uptime\" interval => 10 } } output { stdout{} }", + }, + } + + before := test.StepsFunc(func(k *test.K8sClient) test.StepList { + return test.StepList{}.WithStep(test.Step{ + Name: "Create pipeline secret", + Test: test.Eventually(func() error { + return k.CreateOrUpdateSecrets(pipelineSecret) + }), + }) + }) + + name := "test-split-pipeline" + volName := "ls-pipe-vol" + mountPath := "/usr/share/logstash/pipeline" + + b := logstash.NewBuilder(name). + WithNodeCount(1). + WithPipelines([]commonv1.Config{ + { + Data: map[string]interface{}{ + "pipeline.id": "split", + "path.config": mountPath, + }, + }, + { + Data: map[string]interface{}{ + "pipeline.id": "main", + "config.string": "input { stdin{} } output { stdout{} }", + }, + }, + }). + WithVolumes(corev1.Volume{ + Name: volName, + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: secretName, + }, + }, + }). + WithVolumeMounts(corev1.VolumeMount{ + Name: volName, + MountPath: mountPath, + }) + + steps := test.StepsFunc(func(k *test.K8sClient) test.StepList { + return test.StepList{ + b.CheckMetricsRequest(k, + logstash.Request{ + Name: "pipeline [split]", + Path: "/_node/pipelines/split", + }, + logstash.Want{ + Status: "green", + Match: map[string]string{"pipelines.split.batch_size": "125"}, + }), + test.Step{ + Name: "Delete pipeline secret", + Test: test.Eventually(func() error { + return k.DeleteSecrets(pipelineSecret) + }), + }, + } + }) + + test.Sequence(before, steps, b).RunSequential(t) +} diff --git a/test/e2e/test/k8s_client.go b/test/e2e/test/k8s_client.go index f3b4d13ddf..bb56fee30b 100644 --- a/test/e2e/test/k8s_client.go +++ b/test/e2e/test/k8s_client.go @@ -363,6 +363,15 @@ func (k K8sClient) CreateOrUpdateSecrets(secrets ...corev1.Secret) error { return nil } +func (k *K8sClient) DeleteSecrets(secrets ...corev1.Secret) error { + for i := range secrets { + if err := k.Client.Delete(context.Background(), &secrets[i]); err != nil { + return err + } + } + return nil +} + func (k K8sClient) CreateOrUpdate(objs ...client.Object) error { for _, obj := range objs { // create a copy to ensure that the original object is not modified diff --git a/test/e2e/test/logstash/builder.go b/test/e2e/test/logstash/builder.go index b92fea66a4..07d0ee21de 100644 --- a/test/e2e/test/logstash/builder.go +++ b/test/e2e/test/logstash/builder.go @@ -5,6 +5,7 @@ package logstash import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" @@ -116,6 +117,40 @@ func (b Builder) WithServices(services ...logstashv1alpha1.LogstashService) Buil return b } +func (b Builder) WithPipelines(pipelines []commonv1.Config) Builder { + b.Logstash.Spec.Pipelines = pipelines + return b +} + +func (b Builder) WithPipelinesConfigRef(ref commonv1.ConfigSource) Builder { + b.Logstash.Spec.PipelinesRef = &ref + return b +} + +func (b Builder) WithVolumes(vols ...corev1.Volume) Builder { + b.Logstash.Spec.PodTemplate.Spec.Volumes = append(b.Logstash.Spec.PodTemplate.Spec.Volumes, vols...) + return b +} + +func (b Builder) WithVolumeMounts(mounts ...corev1.VolumeMount) Builder { + if b.Logstash.Spec.PodTemplate.Spec.Containers == nil { + b.Logstash.Spec.PodTemplate.Spec.Containers = []corev1.Container{ + { + Name: "logstash", + VolumeMounts: mounts, + }, + } + return b + } + + if b.Logstash.Spec.PodTemplate.Spec.Containers[0].VolumeMounts == nil { + b.Logstash.Spec.PodTemplate.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{} + } + b.Logstash.Spec.PodTemplate.Spec.Containers[0].VolumeMounts = append(b.Logstash.Spec.PodTemplate.Spec.Containers[0].VolumeMounts, mounts...) + + return b +} + func (b Builder) WithMonitoring(metricsESRef commonv1.ObjectSelector, logsESRef commonv1.ObjectSelector) Builder { b.Logstash.Spec.Monitoring.Metrics.ElasticsearchRefs = []commonv1.ObjectSelector{metricsESRef} b.Logstash.Spec.Monitoring.Logs.ElasticsearchRefs = []commonv1.ObjectSelector{logsESRef} diff --git a/test/e2e/test/logstash/checks.go b/test/e2e/test/logstash/checks.go index 9d5e900498..19524fa3fd 100644 --- a/test/e2e/test/logstash/checks.go +++ b/test/e2e/test/logstash/checks.go @@ -13,13 +13,21 @@ import ( v1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/common/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1" + "github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/settings" "github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s" "github.com/elastic/cloud-on-k8s/v2/test/e2e/test" ) -type logstashStatus struct { - Version string `json:"version"` - Status string `json:"status"` +type Request struct { + Name string + Path string +} + +type Want struct { + Status string + // Key is field path of ucfg.Config. Value is the expected string + // example, pipelines.demo.batch_size : 2 + Match map[string]string } // CheckSecrets checks that expected secrets have been created. @@ -36,6 +44,14 @@ func CheckSecrets(b Builder, k *test.K8sClient) test.Step { "logstash.k8s.elastic.co/name": logstashName, }, }, + { + Name: logstashName + "-ls-pipeline", + Keys: []string{"pipelines.yml"}, + Labels: map[string]string{ + "eck.k8s.elastic.co/credentials": "true", + "logstash.k8s.elastic.co/name": logstashName, + }, + }, } return expected }) @@ -83,30 +99,75 @@ func CheckStatus(b Builder, k *test.K8sClient) test.Step { } func (b Builder) CheckStackTestSteps(k *test.K8sClient) test.StepList { - println(test.Ctx().TestTimeout) return test.StepList{ - { - Name: "Logstash should respond to requests", - Test: test.Eventually(func() error { - client, err := NewLogstashClient(b.Logstash, k) - if err != nil { - return err - } - bytes, err := DoRequest(client, b.Logstash, "GET", "/") + b.CheckMetricsRequest(k, + Request{ + Name: "metrics", + Path: "/", + }, + Want{ + Status: "green", + }), + b.CheckMetricsRequest(k, + Request{ + Name: "default pipeline", + Path: "/_node/pipelines/main", + }, + Want{ + Status: "green", + Match: map[string]string{"pipelines.main.batch_size": "125"}, + }), + } +} + +func (b Builder) CheckMetricsRequest(k *test.K8sClient, req Request, want Want) test.Step { + return test.Step{ + Name: fmt.Sprintf("Logstash should respond to %s requests", req.Name), + Test: test.Eventually(func() error { + // send request and parse to map obj + client, err := NewLogstashClient(b.Logstash, k) + if err != nil { + return err + } + + bytes, err := DoRequest(client, b.Logstash, "GET", req.Path) + if err != nil { + return err + } + + var response map[string]interface{} + err = json.Unmarshal(bytes, &response) + if err != nil { + return err + } + + // parse response to ucfg.Config for traverse + res, err := settings.NewCanonicalConfigFrom(response) + if err != nil { + return err + } + + // check status + status, err := res.String("status") + if err != nil { + return err + } + if status != want.Status { + return fmt.Errorf("expected %s but got %s", want.Status, status) + } + + // check expected string + for k, v := range want.Match { + str, err := res.String(k) if err != nil { return err } - var status logstashStatus - if err := json.Unmarshal(bytes, &status); err != nil { - return err - } - - if status.Status != "green" { - return fmt.Errorf("expected green but got %s", status.Status) + if str != v { + return fmt.Errorf("expected %s to be %s but got %s", k, v, str) } - return nil - }), - }, + } + return nil + }), } }