Skip to content

Commit

Permalink
Logstash pipelines (#6480)
Browse files Browse the repository at this point in the history
This PR allows configuration of Logstash pipeline.yml in CRD. pipeline.yml is an array of map of string/interface{} defining multiple pipelines. `.` in between the key is treated as string. Pipelines can be set from either inline in Pipelines or secret referencing in PipelinesRef.

e2e test: TestPipelineConfigRefLogstash, TestPipelineConfigLogstash

Co-authored-by: Rob Bavey <[email protected]>
Co-authored-by: Michael Morello <[email protected]>
  • Loading branch information
3 people authored Apr 6, 2023
1 parent 450e61c commit 7ce5d9a
Show file tree
Hide file tree
Showing 26 changed files with 1,334 additions and 31 deletions.
17 changes: 17 additions & 0 deletions config/crds/v1/all-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9223,6 +9223,23 @@ spec:
type: array
type: object
type: object
pipelines:
description: Pipelines holds the Logstash Pipelines. At most one of
[`Pipelines`, `PipelinesRef`] can be specified.
items:
type: object
type: array
x-kubernetes-preserve-unknown-fields: true
pipelinesRef:
description: PipelinesRef contains a reference to an existing Kubernetes
Secret holding the Logstash Pipelines. Logstash pipelines must be
specified as yaml, under a single "pipelines.yml" entry. At most
one of [`Pipelines`, `PipelinesRef`] can be specified.
properties:
secretName:
description: SecretName is the name of the secret.
type: string
type: object
podTemplate:
description: PodTemplate provides customisation options for the Logstash
pods.
Expand Down
17 changes: 17 additions & 0 deletions config/crds/v1/bases/logstash.k8s.elastic.co_logstashes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ spec:
type: array
type: object
type: object
pipelines:
description: Pipelines holds the Logstash Pipelines. At most one of
[`Pipelines`, `PipelinesRef`] can be specified.
items:
type: object
type: array
x-kubernetes-preserve-unknown-fields: true
pipelinesRef:
description: PipelinesRef contains a reference to an existing Kubernetes
Secret holding the Logstash Pipelines. Logstash pipelines must be
specified as yaml, under a single "pipelines.yml" entry. At most
one of [`Pipelines`, `PipelinesRef`] can be specified.
properties:
secretName:
description: SecretName is the name of the secret.
type: string
type: object
podTemplate:
description: PodTemplate provides customisation options for the Logstash
pods.
Expand Down
3 changes: 3 additions & 0 deletions config/samples/logstash/logstash.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ spec:
log.level: info
api.http.host: "0.0.0.0"
queue.type: memory
pipelines:
- pipeline.id: main
config.string: input { exec { command => 'uptime' interval => 10 } } output { stdout{} }
podTemplate:
spec:
containers:
Expand Down
1 change: 0 additions & 1 deletion config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ webhooks:
- logstashes
sideEffects: None
- admissionReviewVersions:
- v1alpha1
- v1
- v1beta1
clientConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9277,6 +9277,23 @@ spec:
type: array
type: object
type: object
pipelines:
description: Pipelines holds the Logstash Pipelines. At most one of
[`Pipelines`, `PipelinesRef`] can be specified.
items:
type: object
type: array
x-kubernetes-preserve-unknown-fields: true
pipelinesRef:
description: PipelinesRef contains a reference to an existing Kubernetes
Secret holding the Logstash Pipelines. Logstash pipelines must be
specified as yaml, under a single "pipelines.yml" entry. At most
one of [`Pipelines`, `PipelinesRef`] can be specified.
properties:
secretName:
description: SecretName is the name of the secret.
type: string
type: object
podTemplate:
description: PodTemplate provides customisation options for the Logstash
pods.
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/api-docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,8 @@ LogstashSpec defines the desired state of Logstash
| *`image`* __string__ | Image is the Logstash Docker image to deploy. Version and Type have to match the Logstash in the image.
| *`config`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$]__ | Config holds the Logstash configuration. At most one of [`Config`, `ConfigRef`] can be specified.
| *`configRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | ConfigRef contains a reference to an existing Kubernetes Secret holding the Logstash configuration. Logstash settings must be specified as yaml, under a single "logstash.yml" entry. At most one of [`Config`, `ConfigRef`] can be specified.
| *`pipelines`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-config[$$Config$$] array__ | Pipelines holds the Logstash Pipelines. At most one of [`Pipelines`, `PipelinesRef`] can be specified.
| *`pipelinesRef`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-configsource[$$ConfigSource$$]__ | PipelinesRef contains a reference to an existing Kubernetes Secret holding the Logstash Pipelines. Logstash pipelines must be specified as yaml, under a single "pipelines.yml" entry. At most one of [`Pipelines`, `PipelinesRef`] can be specified.
| *`services`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-logstash-v1alpha1-logstashservice[$$LogstashService$$] array__ | Services contains details of services that Logstash should expose - similar to the HTTP layer configuration for the rest of the stack, but also applicable for more use cases than the metrics API, as logstash may need to be opened up for other services: beats, TCP, UDP, etc, inputs
| *`monitoring`* __xref:{anchor_prefix}-github-com-elastic-cloud-on-k8s-v2-pkg-apis-common-v1-monitoring[$$Monitoring$$]__ | Monitoring enables you to collect and ship log and monitoring data of this Logstash. Metricbeat and Filebeat are deployed in the same Pod as sidecars and each one sends data to one or two different Elasticsearch monitoring clusters running in the same Kubernetes cluster.
| *`podTemplate`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#podtemplatespec-v1-core[$$PodTemplateSpec$$]__ | PodTemplate provides customisation options for the Logstash pods.
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/logstash/v1alpha1/logstash_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ type LogstashSpec struct {
// +kubebuilder:validation:Optional
ConfigRef *commonv1.ConfigSource `json:"configRef,omitempty"`

// Pipelines holds the Logstash Pipelines. At most one of [`Pipelines`, `PipelinesRef`] can be specified.
// +kubebuilder:validation:Optional
// +kubebuilder:pruning:PreserveUnknownFields
Pipelines []commonv1.Config `json:"pipelines,omitempty"`

// PipelinesRef contains a reference to an existing Kubernetes Secret holding the Logstash Pipelines.
// Logstash pipelines must be specified as yaml, under a single "pipelines.yml" entry. At most one of [`Pipelines`, `PipelinesRef`]
// can be specified.
// +kubebuilder:validation:Optional
PipelinesRef *commonv1.ConfigSource `json:"pipelinesRef,omitempty"`

// Services contains details of services that Logstash should expose - similar to the HTTP layer configuration for the
// rest of the stack, but also applicable for more use cases than the metrics API, as logstash may need to
// be opened up for other services: beats, TCP, UDP, etc, inputs
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/logstash/v1alpha1/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
const (
apiServiceSuffix = "api"
configSuffix = "config"
pipelineSuffix = "pipeline"
)

// Namer is a Namer that is configured with the defaults for resources related to a Logstash resource.
Expand All @@ -34,3 +35,7 @@ func APIServiceName(name string) string {
func UserServiceName(deployName string, name string) string {
return Namer.Suffix(deployName, name)
}

func PipelineSecretName(name string) string {
return Namer.Suffix(name, pipelineSuffix)
}
13 changes: 13 additions & 0 deletions pkg/apis/logstash/v1alpha1/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
checkSingleConfigSource,
checkMonitoring,
checkAssociations,
checkSinglePipelineSource,
}

updateChecks = []func(old, curr *Logstash) field.ErrorList{
Expand Down Expand Up @@ -73,3 +74,15 @@ func checkAssociations(l *Logstash) field.ErrorList {
err2 := commonv1.CheckAssociationRefs(monitoringPath.Child("logs"), l.GetMonitoringLogsRefs()...)
return append(err1, err2...)
}

func checkSinglePipelineSource(a *Logstash) field.ErrorList {
if a.Spec.Pipelines != nil && a.Spec.PipelinesRef != nil {
msg := "Specify at most one of [`pipelines`, `pipelinesRef`], not both"
return field.ErrorList{
field.Forbidden(field.NewPath("spec").Child("pipelines"), msg),
field.Forbidden(field.NewPath("spec").Child("pipelinesRef"), msg),
}
}

return nil
}
51 changes: 51 additions & 0 deletions pkg/apis/logstash/v1alpha1/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,57 @@ func Test_checkSingleConfigSource(t *testing.T) {
}
}

func Test_checkSinglePipelineSource(t *testing.T) {
tests := []struct {
name string
logstash Logstash
wantErr bool
}{
{
name: "pipelinesRef absent, pipelines present",
logstash: Logstash{
Spec: LogstashSpec{
Pipelines: []commonv1.Config{},
},
},
wantErr: false,
},
{
name: "pipelines absent, pipelinesRef present",
logstash: Logstash{
Spec: LogstashSpec{
PipelinesRef: &commonv1.ConfigSource{},
},
},
wantErr: false,
},
{
name: "neither present",
logstash: Logstash{
Spec: LogstashSpec{},
},
wantErr: false,
},
{
name: "both present",
logstash: Logstash{
Spec: LogstashSpec{
Pipelines: []commonv1.Config{},
PipelinesRef: &commonv1.ConfigSource{},
},
},
wantErr: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := checkSinglePipelineSource(&tc.logstash)
assert.Equal(t, tc.wantErr, len(got) > 0)
})
}
}

func Test_checkSupportedVersion(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/logstash/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions pkg/controller/common/configref.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"context"
"fmt"

"github.com/elastic/go-ucfg"
uyaml "github.com/elastic/go-ucfg/yaml"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -34,6 +37,23 @@ func ParseConfigRef(
configRef *commonv1.ConfigSource,
secretKey string, // retrieve config data from that entry in the secret
) (*settings.CanonicalConfig, error) {
parsed, err := ParseConfigRefToConfig(driver, resource, configRef, secretKey, ConfigRefWatchName, settings.Options)
if err != nil {
return nil, err
}
return (*settings.CanonicalConfig)(parsed), nil
}

// ParseConfigRefToConfig retrieves the content of a secret referenced in `configRef`, sets up dynamic watches for that secret,
// and parses the secret content into ucfg.Config.
func ParseConfigRefToConfig(
driver driver.Interface,
resource runtime.Object, // eg. Beat, EnterpriseSearch
configRef *commonv1.ConfigSource,
secretKey string, // retrieve config data from that entry in the secret
configRefWatchName func(types.NamespacedName) string,
configOptions []ucfg.Option,
) (*ucfg.Config, error) {
resourceMeta, err := meta.Accessor(resource)
if err != nil {
return nil, err
Expand All @@ -46,7 +66,7 @@ func ParseConfigRef(
if configRef != nil && configRef.SecretName != "" {
secretNames = append(secretNames, configRef.SecretName)
}
if err := watches.WatchUserProvidedSecrets(resourceNsn, driver.DynamicWatches(), ConfigRefWatchName(resourceNsn), secretNames); err != nil {
if err := watches.WatchUserProvidedSecrets(resourceNsn, driver.DynamicWatches(), configRefWatchName(resourceNsn), secretNames); err != nil {
return nil, err
}

Expand All @@ -66,7 +86,9 @@ func ParseConfigRef(
driver.Recorder().Event(resource, corev1.EventTypeWarning, events.EventReasonUnexpected, msg)
return nil, errors.New(msg)
}
parsed, err := settings.ParseConfig(data)

parsed, err := uyaml.NewConfig(data, configOptions...)

if err != nil {
msg := fmt.Sprintf("unable to parse %s in configRef secret %s/%s", secretKey, namespace, configRef.SecretName)
driver.Recorder().Event(resource, corev1.EventTypeWarning, events.EventReasonUnexpected, msg)
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/logstash/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func internalReconcile(params Params) (*reconciler.Results, logstashv1alpha1.Log
return results.WithError(err), params.Status
}

if err := reconcilePipeline(params, configHash); err != nil {
return results.WithError(err), params.Status
}

podTemplate := buildPodTemplate(params, configHash)
return reconcileStatefulSet(params, podTemplate)
}
2 changes: 2 additions & 0 deletions pkg/controller/logstash/logstash_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
logconf "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log"
)
Expand Down Expand Up @@ -196,5 +197,6 @@ func (r *ReconcileLogstash) validate(ctx context.Context, logstash logstashv1alp
func (r *ReconcileLogstash) onDelete(ctx context.Context, obj types.NamespacedName) error {
r.dynamicWatches.Secrets.RemoveHandlerForKey(keystore.SecureSettingsWatchName(obj))
r.dynamicWatches.Secrets.RemoveHandlerForKey(common.ConfigRefWatchName(obj))
r.dynamicWatches.Secrets.RemoveHandlerForKey(pipelines.RefWatchName(obj))
return reconciler.GarbageCollectSoftOwnedSecrets(ctx, r.Client, obj, logstashv1alpha1.Kind)
}
83 changes: 83 additions & 0 deletions pkg/controller/logstash/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package logstash

import (
"hash"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v2/pkg/apis/logstash/v1alpha1"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/labels"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/logstash/pipelines"
)

func reconcilePipeline(params Params, configHash hash.Hash) error {
defer tracing.Span(&params.Context)()

cfgBytes, err := buildPipeline(params)
if err != nil {
return err
}

expected := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: params.Logstash.Namespace,
Name: logstashv1alpha1.PipelineSecretName(params.Logstash.Name),
Labels: labels.AddCredentialsLabel(NewLabels(params.Logstash)),
},
Data: map[string][]byte{
PipelineFileName: cfgBytes,
},
}

if _, err = reconciler.ReconcileSecret(params.Context, params.Client, expected, &params.Logstash); err != nil {
return err
}

_, _ = configHash.Write(cfgBytes)

return nil
}

func buildPipeline(params Params) ([]byte, error) {
userProvidedCfg, err := getUserPipeline(params)
if err != nil {
return nil, err
}

if userProvidedCfg != nil {
return userProvidedCfg.Render()
}

cfg := defaultPipeline
return cfg.Render()
}

// getUserPipeline extracts the pipeline either from the spec `pipeline` field or from the Secret referenced by spec
// `pipelineRef` field.
func getUserPipeline(params Params) (*pipelines.Config, error) {
if params.Logstash.Spec.Pipelines != nil {
pipes := make([]map[string]interface{}, 0, len(params.Logstash.Spec.Pipelines))
for _, p := range params.Logstash.Spec.Pipelines {
pipes = append(pipes, p.Data)
}

return pipelines.FromSpec(pipes)
}
return pipelines.ParsePipelinesRef(params, &params.Logstash, params.Logstash.Spec.PipelinesRef, PipelineFileName)
}

var (
defaultPipeline = pipelines.MustFromSpec([]map[string]string{
{
"pipeline.id": "main",
"path.config": "/usr/share/logstash/pipeline",
},
})
)
Loading

0 comments on commit 7ce5d9a

Please sign in to comment.