From 89d7f7894e4020c1f21c268e61b6482eea3e66b9 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 09:46:22 -0500 Subject: [PATCH 01/13] Creating a "wrap" stage which can take labels and embed them into a log line by wrapping the log line in JSON --- pkg/logentry/stages/stage.go | 6 + pkg/logentry/stages/wrap.go | 175 +++++++++++++++ pkg/logentry/stages/wrap_test.go | 374 +++++++++++++++++++++++++++++++ 3 files changed, 555 insertions(+) create mode 100644 pkg/logentry/stages/wrap.go create mode 100644 pkg/logentry/stages/wrap_test.go diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 82c265074ebc1..e06aeaf5ded87 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -28,6 +28,7 @@ const ( StageTypeTenant = "tenant" StageTypeDrop = "drop" StageTypeMultiline = "multiline" + StageTypeWrap = "wrap" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -145,6 +146,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeWrap: + s, err = newWrapStage(logger, cfg, registerer) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) } diff --git a/pkg/logentry/stages/wrap.go b/pkg/logentry/stages/wrap.go new file mode 100644 index 0000000000000..7a75023946c25 --- /dev/null +++ b/pkg/logentry/stages/wrap.go @@ -0,0 +1,175 @@ +package stages + +import ( + "errors" + "time" + + "github.com/go-kit/kit/log" + json "github.com/json-iterator/go" + "github.com/mitchellh/mapstructure" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" +) + +const () + +var () + +type Wrapped struct { + Labels map[string]string `json:",inline"` + Entry string `json:"_entry"` +} + +// UnmarshalJSON populates a Wrapped struct where every key except the _entry key is added to the Labels field +func (w *Wrapped) UnmarshalJSON(data []byte) error { + m := &map[string]interface{}{} + err := json.Unmarshal(data, m) + if err != nil { + return err + } + w.Labels = map[string]string{} + for k, v := range *m { + // _entry key goes to the Entry field, everything else becomes a label + if k == "_entry" { + if s, ok := v.(string); ok { + w.Entry = s + } else { + return errors.New("failed to unmarshal json, all values must be of type string") + } + } else { + if s, ok := v.(string); ok { + w.Labels[k] = s + } else { + return errors.New("failed to unmarshal json, all values must be of type string") + } + } + } + return nil +} + +// MarshalJSON creates a Wrapped struct as JSON where the Labels are flattened into the top level of the object +func (p Wrapped) MarshalJSON() ([]byte, error) { + + // Marshal the entry to properly escape if it's json or contains quotes + b, err := json.Marshal(p.Entry) + if err != nil { + return nil, err + } + + // Create a map and set the already marshalled line entry + m := map[string]json.RawMessage{ + "_entry": b, + } + + // Add labels to the map, we do this at the top level to make querying more intuitive and easier. + for k, v := range p.Labels { + // Also marshal the label values to properly escape them as well + lv, err := json.Marshal(v) + if err != nil { + return nil, err + } + m[k] = lv + } + + return json.Marshal(m) +} + +// WrapConfig contains the configuration for a wrapStage +type WrapConfig struct { + Labels []string `mapstrcuture:"labels"` + IngestTimestamp *bool `mapstructure:"ingest_timestamp"` +} + +// validateWrapConfig validates the WrapConfig for the wrapStage +func validateWrapConfig(cfg *WrapConfig) error { + + return nil +} + +// newWrapStage creates a DropStage from config +func newWrapStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { + cfg := &WrapConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateWrapConfig(cfg) + if err != nil { + return nil, err + } + + return &wrapStage{ + logger: log.With(logger, "component", "stage", "type", "wrap"), + cfg: cfg, + dropCount: getDropCountMetric(registerer), + }, nil +} + +// wrapStage applies Label matchers to determine if the include stages should be run +type wrapStage struct { + logger log.Logger + cfg *WrapConfig + dropCount *prometheus.CounterVec +} + +func (m *wrapStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + out <- m.wrap(e) + } + }() + return out +} + +func (m *wrapStage) wrap(e Entry) Entry { + lbls := e.Labels + wrappedLabels := make(map[string]string, len(m.cfg.Labels)) + foundLables := []model.LabelName{} + + // Iterate through all the labels and extract any that match our list of labels to embed + for lk, lv := range lbls { + for _, wl := range m.cfg.Labels { + if string(lk) == wl { + wrappedLabels[wl] = string(lv) + foundLables = append(foundLables, lk) + } + } + } + + // TODO also iterate through extracted map? + + // Remove the found labels from the entry labels + for _, fl := range foundLables { + delete(lbls, fl) + } + + // Embed the extracted labels into the wrapper object + w := Wrapped{ + Labels: wrappedLabels, + Entry: e.Line, + } + + // Marshal to json + wl, err := json.Marshal(w) + if err != nil { + + } + + // Replace the labels and the line with new values + e.Labels = lbls + e.Line = string(wl) + + // If the config says to re-write the timestamp to the ingested time, do that now + if m.cfg.IngestTimestamp != nil && *m.cfg.IngestTimestamp { + e.Timestamp = time.Now() + } + + return e +} + +// Name implements Stage +func (m *wrapStage) Name() string { + return StageTypeWrap +} diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go new file mode 100644 index 0000000000000..38bf310835fa6 --- /dev/null +++ b/pkg/logentry/stages/wrap_test.go @@ -0,0 +1,374 @@ +package stages + +import ( + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" +) + +// Not all these are tested but are here to make sure the different types marshal without error +var testWrapYaml = ` +pipeline_stages: +- wrap: + labels: + - pod + - container + ingest_timestamp: false +` + +func Test_wrapStage_Process(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util_log.InitLogger(cfg) + Debug = true + + tests := []struct { + name string + config *DropConfig + labels model.LabelSet + extracted map[string]interface{} + t time.Time + entry string + shouldDrop bool + }{ + { + name: "Longer Than Should Drop", + config: &DropConfig{ + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{}, + entry: "12345678901", + shouldDrop: true, + }, + { + name: "Longer Than Should Not Drop When Equal", + config: &DropConfig{ + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{}, + entry: "1234567890", + shouldDrop: false, + }, + { + name: "Longer Than Should Not Drop When Less", + config: &DropConfig{ + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{}, + entry: "123456789", + shouldDrop: false, + }, + { + name: "Older than Should Drop", + config: &DropConfig{ + OlderThan: ptrFromString("1h"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{}, + t: time.Now().Add(-2 * time.Hour), + shouldDrop: true, + }, + { + name: "Older than Should Not Drop", + config: &DropConfig{ + OlderThan: ptrFromString("1h"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{}, + t: time.Now().Add(-5 * time.Minute), + shouldDrop: false, + }, + { + name: "Matched Source", + config: &DropConfig{ + Source: ptrFromString("key"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "", + }, + shouldDrop: true, + }, + { + name: "Did not match Source", + config: &DropConfig{ + Source: ptrFromString("key1"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "val1", + }, + shouldDrop: false, + }, + { + name: "Matched Source and Value", + config: &DropConfig{ + Source: ptrFromString("key"), + Value: ptrFromString("val1"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "val1", + }, + shouldDrop: true, + }, + { + name: "Did not match Source and Value", + config: &DropConfig{ + Source: ptrFromString("key"), + Value: ptrFromString("val1"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "VALRUE1", + }, + shouldDrop: false, + }, + { + name: "Regex Matched Source and Value", + config: &DropConfig{ + Source: ptrFromString("key"), + Expression: ptrFromString(".*val.*"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "val1", + }, + shouldDrop: true, + }, + { + name: "Regex Did not match Source and Value", + config: &DropConfig{ + Source: ptrFromString("key"), + Expression: ptrFromString(".*val.*"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "pal1", + }, + shouldDrop: false, + }, + { + name: "Regex No Matching Source", + config: &DropConfig{ + Source: ptrFromString("key"), + Expression: ptrFromString(".*val.*"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "pokey": "pal1", + }, + shouldDrop: false, + }, + { + name: "Regex Did Not Match Line", + config: &DropConfig{ + Expression: ptrFromString(".*val.*"), + }, + labels: model.LabelSet{}, + entry: "this is a line which does not match the regex", + extracted: map[string]interface{}{}, + shouldDrop: false, + }, + { + name: "Regex Matched Line", + config: &DropConfig{ + Expression: ptrFromString(".*val.*"), + }, + labels: model.LabelSet{}, + entry: "this is a line with the word value in it", + extracted: map[string]interface{}{}, + shouldDrop: true, + }, + { + name: "Match Source and Length Both Match", + config: &DropConfig{ + Source: ptrFromString("key"), + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "pal1", + }, + entry: "12345678901", + shouldDrop: true, + }, + { + name: "Match Source and Length Only First Matches", + config: &DropConfig{ + Source: ptrFromString("key"), + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "pal1", + }, + entry: "123456789", + shouldDrop: false, + }, + { + name: "Match Source and Length Only Second Matches", + config: &DropConfig{ + Source: ptrFromString("key"), + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "WOOOOOOOOOOOOOO": "pal1", + }, + entry: "123456789012", + shouldDrop: false, + }, + { + name: "Everything Must Match", + config: &DropConfig{ + Source: ptrFromString("key"), + Expression: ptrFromString(".*val.*"), + OlderThan: ptrFromString("1h"), + LongerThan: ptrFromString("10b"), + }, + labels: model.LabelSet{}, + extracted: map[string]interface{}{ + "key": "must contain value to match", + }, + t: time.Now().Add(-2 * time.Hour), + entry: "12345678901", + shouldDrop: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateDropConfig(tt.config) + if err != nil { + t.Error(err) + } + m, err := newDropStage(util_log.Logger, tt.config, prometheus.DefaultRegisterer) + require.NoError(t, err) + out := processEntries(m, newEntry(tt.extracted, tt.labels, tt.entry, tt.t)) + if tt.shouldDrop { + assert.Len(t, out, 0) + } else { + assert.Len(t, out, 1) + } + }) + } +} + +// TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline +func TestWrapPipeline(t *testing.T) { + registry := prometheus.NewRegistry() + plName := "test_pipeline" + pl, err := NewPipeline(util_log.Logger, loadConfig(testWrapYaml), &plName, registry) + require.NoError(t, err) + + l1Lbls := model.LabelSet{ + "pod": "foo-xsfs3", + "container": "foo", + "namespace": "dev", + "cluster": "us-eu-1", + } + + l2Lbls := model.LabelSet{ + "pod": "foo-vvsdded", + "container": "bar", + "namespace": "dev", + "cluster": "us-eu-1", + } + + testTime := time.Now() + + out := processEntries(pl, + newEntry(nil, l1Lbls, testMatchLogLineApp1, testTime), + newEntry(nil, l2Lbls, testMatchLogLineApp2, testTime), + ) + + // Both lines should succeed + assert.Len(t, out, 2) + + // Expected labels should remove the wrapped labels + expectedLbls := model.LabelSet{ + "namespace": "dev", + "cluster": "us-eu-1", + } + assert.Equal(t, expectedLbls, out[0].Labels) + assert.Equal(t, expectedLbls, out[1].Labels) + + //Unmarshal the wrapped object + w := &Wrapped{} + assert.NoError(t, json.Unmarshal([]byte(out[0].Entry.Entry.Line), w)) + expectedWrappedLbls := map[string]string{ + "pod": "foo-xsfs3", + "container": "foo", + } + assert.Equal(t, expectedWrappedLbls, w.Labels) + assert.Equal(t, testMatchLogLineApp1, w.Entry) + + //assert.Equal(t, out[0].Line, testMatchLogLineApp2) +} + +func Test_validateWrapConfig(t *testing.T) { + tests := []struct { + name string + config *DropConfig + wantErr error + }{ + { + name: "ErrEmpty", + config: &DropConfig{}, + wantErr: errors.New(ErrDropStageEmptyConfig), + }, + { + name: "Invalid Duration", + config: &DropConfig{ + OlderThan: &dropInvalidDur, + }, + wantErr: fmt.Errorf( + ErrDropStageInvalidDuration, + dropInvalidDur, + `time: unknown unit "y" in duration "10y"`, + ), + }, + { + name: "Invalid Config", + config: &DropConfig{ + Value: &dropVal, + Expression: &dropRegex, + }, + wantErr: errors.New(ErrDropStageInvalidConfig), + }, + { + name: "Invalid Regex", + config: &DropConfig{ + Expression: &dropInvalidRegex, + }, + wantErr: fmt.Errorf(ErrDropStageInvalidRegex, "error parsing regexp: invalid named capture: `(?P Date: Sun, 28 Feb 2021 12:54:19 -0500 Subject: [PATCH 02/13] improve tests, deterministic ordering of json --- pkg/logentry/stages/wrap.go | 96 ++++-- pkg/logentry/stages/wrap_test.go | 493 +++++++++++++------------------ 2 files changed, 281 insertions(+), 308 deletions(-) diff --git a/pkg/logentry/stages/wrap.go b/pkg/logentry/stages/wrap.go index 7a75023946c25..fcd97c747842a 100644 --- a/pkg/logentry/stages/wrap.go +++ b/pkg/logentry/stages/wrap.go @@ -1,19 +1,29 @@ package stages import ( + "bytes" "errors" + "fmt" + "reflect" + "sort" "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" json "github.com/json-iterator/go" "github.com/mitchellh/mapstructure" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) -const () +const ( + entryKey = "_entry" +) -var () +var ( + reallyTrue = true + reallyFalse = false +) type Wrapped struct { Labels map[string]string `json:",inline"` @@ -30,7 +40,7 @@ func (w *Wrapped) UnmarshalJSON(data []byte) error { w.Labels = map[string]string{} for k, v := range *m { // _entry key goes to the Entry field, everything else becomes a label - if k == "_entry" { + if k == entryKey { if s, ok := v.(string); ok { w.Entry = s } else { @@ -56,22 +66,46 @@ func (p Wrapped) MarshalJSON() ([]byte, error) { return nil, err } - // Create a map and set the already marshalled line entry - m := map[string]json.RawMessage{ - "_entry": b, + // Creating a map and marshalling from a map results in a non deterministic ordering of the resulting json object + // This is functionally ok but really annoying to humans and automated tests. + // Instead we will build the json ourselves after sorting all the labels to get a consistent output + keys := make([]string, 0, len(p.Labels)) + for k := range p.Labels { + keys = append(keys, k) } + sort.Strings(keys) + + var buf bytes.Buffer - // Add labels to the map, we do this at the top level to make querying more intuitive and easier. - for k, v := range p.Labels { - // Also marshal the label values to properly escape them as well - lv, err := json.Marshal(v) + buf.WriteString("{") + for i, k := range keys { + if i != 0 { + buf.WriteString(",") + } + // marshal key + key, err := json.Marshal(k) + if err != nil { + return nil, err + } + buf.Write(key) + buf.WriteString(":") + // marshal value + val, err := json.Marshal(p.Labels[k]) if err != nil { return nil, err } - m[k] = lv + buf.Write(val) + } + // Only add the comma if something exists in the buffer other than "{" + if buf.Len() > 1 { + buf.WriteString(",") } + // Add the line entry + buf.WriteString("\"" + entryKey + "\":") + buf.Write(b) - return json.Marshal(m) + buf.WriteString("}") + return buf.Bytes(), nil } // WrapConfig contains the configuration for a wrapStage @@ -82,7 +116,10 @@ type WrapConfig struct { // validateWrapConfig validates the WrapConfig for the wrapStage func validateWrapConfig(cfg *WrapConfig) error { - + // Default the IngestTimestamp value to be true + if cfg.IngestTimestamp == nil { + cfg.IngestTimestamp = &reallyTrue + } return nil } @@ -128,23 +165,23 @@ func (m *wrapStage) wrap(e Entry) Entry { wrappedLabels := make(map[string]string, len(m.cfg.Labels)) foundLables := []model.LabelName{} - // Iterate through all the labels and extract any that match our list of labels to embed - for lk, lv := range lbls { + // Iterate through all the extracted map (which also includes all the labels) + for lk, lv := range e.Extracted { for _, wl := range m.cfg.Labels { - if string(lk) == wl { - wrappedLabels[wl] = string(lv) - foundLables = append(foundLables, lk) + if lk == wl { + sv, err := getString(lv) + if err != nil { + if Debug { + level.Debug(m.logger).Log("msg", fmt.Sprintf("value for key: '%s' cannot be converted to a string and cannot be wrapped", lk), "err", err, "type", reflect.TypeOf(lv)) + } + continue + } + wrappedLabels[wl] = sv + foundLables = append(foundLables, model.LabelName(lk)) } } } - // TODO also iterate through extracted map? - - // Remove the found labels from the entry labels - for _, fl := range foundLables { - delete(lbls, fl) - } - // Embed the extracted labels into the wrapper object w := Wrapped{ Labels: wrappedLabels, @@ -154,7 +191,16 @@ func (m *wrapStage) wrap(e Entry) Entry { // Marshal to json wl, err := json.Marshal(w) if err != nil { + if Debug { + level.Debug(m.logger).Log("msg", fmt.Sprintf("wrap stage failed to marshal wrapped object to json, wrapping will be skipped"), "err", err) + } + return e + } + // Remove anything found which is also a label, do this after the marshalling to not remove labels until + // we are sure the line can be successfully wrapped. + for _, fl := range foundLables { + delete(lbls, fl) } // Replace the labels and the line with new values diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index 38bf310835fa6..9a85786a75032 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -1,272 +1,42 @@ package stages import ( - "encoding/json" - "errors" - "fmt" "testing" "time" util_log "github.com/cortexproject/cortex/pkg/util/log" + json "github.com/json-iterator/go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ww "github.com/weaveworks/common/server" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) // Not all these are tested but are here to make sure the different types marshal without error var testWrapYaml = ` pipeline_stages: -- wrap: - labels: - - pod - - container - ingest_timestamp: false +- match: + selector: "{container=\"foo\"}" + stages: + - wrap: + labels: + - pod + - container + ingest_timestamp: false +- match: + selector: "{container=\"bar\"}" + stages: + - wrap: + labels: + - pod + - container + ingest_timestamp: true ` -func Test_wrapStage_Process(t *testing.T) { - // Enable debug logging - cfg := &ww.Config{} - require.Nil(t, cfg.LogLevel.Set("debug")) - util_log.InitLogger(cfg) - Debug = true - - tests := []struct { - name string - config *DropConfig - labels model.LabelSet - extracted map[string]interface{} - t time.Time - entry string - shouldDrop bool - }{ - { - name: "Longer Than Should Drop", - config: &DropConfig{ - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{}, - entry: "12345678901", - shouldDrop: true, - }, - { - name: "Longer Than Should Not Drop When Equal", - config: &DropConfig{ - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{}, - entry: "1234567890", - shouldDrop: false, - }, - { - name: "Longer Than Should Not Drop When Less", - config: &DropConfig{ - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{}, - entry: "123456789", - shouldDrop: false, - }, - { - name: "Older than Should Drop", - config: &DropConfig{ - OlderThan: ptrFromString("1h"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{}, - t: time.Now().Add(-2 * time.Hour), - shouldDrop: true, - }, - { - name: "Older than Should Not Drop", - config: &DropConfig{ - OlderThan: ptrFromString("1h"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{}, - t: time.Now().Add(-5 * time.Minute), - shouldDrop: false, - }, - { - name: "Matched Source", - config: &DropConfig{ - Source: ptrFromString("key"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "", - }, - shouldDrop: true, - }, - { - name: "Did not match Source", - config: &DropConfig{ - Source: ptrFromString("key1"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "val1", - }, - shouldDrop: false, - }, - { - name: "Matched Source and Value", - config: &DropConfig{ - Source: ptrFromString("key"), - Value: ptrFromString("val1"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "val1", - }, - shouldDrop: true, - }, - { - name: "Did not match Source and Value", - config: &DropConfig{ - Source: ptrFromString("key"), - Value: ptrFromString("val1"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "VALRUE1", - }, - shouldDrop: false, - }, - { - name: "Regex Matched Source and Value", - config: &DropConfig{ - Source: ptrFromString("key"), - Expression: ptrFromString(".*val.*"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "val1", - }, - shouldDrop: true, - }, - { - name: "Regex Did not match Source and Value", - config: &DropConfig{ - Source: ptrFromString("key"), - Expression: ptrFromString(".*val.*"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "pal1", - }, - shouldDrop: false, - }, - { - name: "Regex No Matching Source", - config: &DropConfig{ - Source: ptrFromString("key"), - Expression: ptrFromString(".*val.*"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "pokey": "pal1", - }, - shouldDrop: false, - }, - { - name: "Regex Did Not Match Line", - config: &DropConfig{ - Expression: ptrFromString(".*val.*"), - }, - labels: model.LabelSet{}, - entry: "this is a line which does not match the regex", - extracted: map[string]interface{}{}, - shouldDrop: false, - }, - { - name: "Regex Matched Line", - config: &DropConfig{ - Expression: ptrFromString(".*val.*"), - }, - labels: model.LabelSet{}, - entry: "this is a line with the word value in it", - extracted: map[string]interface{}{}, - shouldDrop: true, - }, - { - name: "Match Source and Length Both Match", - config: &DropConfig{ - Source: ptrFromString("key"), - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "pal1", - }, - entry: "12345678901", - shouldDrop: true, - }, - { - name: "Match Source and Length Only First Matches", - config: &DropConfig{ - Source: ptrFromString("key"), - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "pal1", - }, - entry: "123456789", - shouldDrop: false, - }, - { - name: "Match Source and Length Only Second Matches", - config: &DropConfig{ - Source: ptrFromString("key"), - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "WOOOOOOOOOOOOOO": "pal1", - }, - entry: "123456789012", - shouldDrop: false, - }, - { - name: "Everything Must Match", - config: &DropConfig{ - Source: ptrFromString("key"), - Expression: ptrFromString(".*val.*"), - OlderThan: ptrFromString("1h"), - LongerThan: ptrFromString("10b"), - }, - labels: model.LabelSet{}, - extracted: map[string]interface{}{ - "key": "must contain value to match", - }, - t: time.Now().Add(-2 * time.Hour), - entry: "12345678901", - shouldDrop: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - err := validateDropConfig(tt.config) - if err != nil { - t.Error(err) - } - m, err := newDropStage(util_log.Logger, tt.config, prometheus.DefaultRegisterer) - require.NoError(t, err) - out := processEntries(m, newEntry(tt.extracted, tt.labels, tt.entry, tt.t)) - if tt.shouldDrop { - assert.Len(t, out, 0) - } else { - assert.Len(t, out, 1) - } - }) - } -} - // TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestWrapPipeline(t *testing.T) { registry := prometheus.NewRegistry() @@ -292,7 +62,7 @@ func TestWrapPipeline(t *testing.T) { out := processEntries(pl, newEntry(nil, l1Lbls, testMatchLogLineApp1, testTime), - newEntry(nil, l2Lbls, testMatchLogLineApp2, testTime), + newEntry(nil, l2Lbls, testRegexLogLine, testTime), ) // Both lines should succeed @@ -306,7 +76,13 @@ func TestWrapPipeline(t *testing.T) { assert.Equal(t, expectedLbls, out[0].Labels) assert.Equal(t, expectedLbls, out[1].Labels) - //Unmarshal the wrapped object + // Validate timestamps + // Line 1 should use the first matcher and should use the log line timestamp + assert.Equal(t, testTime, out[0].Timestamp) + // Line 2 should use the second matcher and should get timestamp by the wrap stage + assert.True(t, out[1].Timestamp.After(testTime)) + + // Unmarshal the wrapped object and validate line1 w := &Wrapped{} assert.NoError(t, json.Unmarshal([]byte(out[0].Entry.Entry.Line), w)) expectedWrappedLbls := map[string]string{ @@ -316,59 +92,210 @@ func TestWrapPipeline(t *testing.T) { assert.Equal(t, expectedWrappedLbls, w.Labels) assert.Equal(t, testMatchLogLineApp1, w.Entry) - //assert.Equal(t, out[0].Line, testMatchLogLineApp2) + // Validate line 2 + w = &Wrapped{} + assert.NoError(t, json.Unmarshal([]byte(out[1].Entry.Entry.Line), w)) + expectedWrappedLbls = map[string]string{ + "pod": "foo-vvsdded", + "container": "bar", + } + assert.Equal(t, expectedWrappedLbls, w.Labels) + assert.Equal(t, testRegexLogLine, w.Entry) } -func Test_validateWrapConfig(t *testing.T) { +func Test_wrapStage_Run(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util_log.InitLogger(cfg) + Debug = true + tests := []struct { - name string - config *DropConfig - wantErr error + name string + config *WrapConfig + inputEntry Entry + expectedEntry Entry }{ { - name: "ErrEmpty", - config: &DropConfig{}, - wantErr: errors.New(ErrDropStageEmptyConfig), + name: "no supplied labels list", + config: &WrapConfig{ + Labels: nil, + IngestTimestamp: &reallyFalse, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "{\"" + entryKey + "\":\"test line 1\"}", + }, + }, + }, }, { - name: "Invalid Duration", - config: &DropConfig{ - OlderThan: &dropInvalidDur, + name: "match one supplied label", + config: &WrapConfig{ + Labels: []string{"foo"}, + IngestTimestamp: &reallyFalse, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{ + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "{\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}", + }, + }, }, - wantErr: fmt.Errorf( - ErrDropStageInvalidDuration, - dropInvalidDur, - `time: unknown unit "y" in duration "10y"`, - ), }, { - name: "Invalid Config", - config: &DropConfig{ - Value: &dropVal, - Expression: &dropRegex, + name: "match all supplied labels", + config: &WrapConfig{ + Labels: []string{"foo", "bar"}, + IngestTimestamp: &reallyFalse, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{}, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "{\"bar\":\"baz\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}", + }, + }, }, - wantErr: errors.New(ErrDropStageInvalidConfig), }, { - name: "Invalid Regex", - config: &DropConfig{ - Expression: &dropInvalidRegex, + name: "match extracted map and labels", + config: &WrapConfig{ + Labels: []string{"foo", "extr1"}, + IngestTimestamp: &reallyFalse, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{ + "extr1": "etr1val", + "extr2": "etr2val", + }, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{ + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "{\"extr1\":\"etr1val\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}", + }, + }, }, - wantErr: fmt.Errorf(ErrDropStageInvalidRegex, "error parsing regexp: invalid named capture: `(?P Date: Sun, 28 Feb 2021 12:58:46 -0500 Subject: [PATCH 03/13] add test for quotes --- pkg/logentry/stages/wrap_test.go | 34 ++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index 9a85786a75032..71db160141724 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -276,6 +276,40 @@ func Test_wrapStage_Run(t *testing.T) { }, }, }, + { + name: "escape quotes", + config: &WrapConfig{ + Labels: []string{"foo", "ex\"tr2"}, + IngestTimestamp: &reallyFalse, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{ + "extr1": "etr1val", + "ex\"tr2": `"fd"`, + }, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{ + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "{\"ex\\\"tr2\":\"\\\"fd\\\"\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}", + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From c7a9064328bfe645b006b36baac49b23c0d27eff Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 13:04:53 -0500 Subject: [PATCH 04/13] add test for ingest timestamp --- pkg/logentry/stages/wrap_test.go | 39 +++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index 71db160141724..3816b6eb4fd53 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -310,6 +310,38 @@ func Test_wrapStage_Run(t *testing.T) { }, }, }, + { + name: "ingest timestamp", + config: &WrapConfig{ + Labels: nil, + IngestTimestamp: &reallyTrue, + }, + inputEntry: Entry{ + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "test line 1", + }, + }, + }, + expectedEntry: Entry{ + Entry: api.Entry{ + Labels: model.LabelSet{ + "foo": "bar", + "bar": "baz", + }, + Entry: logproto.Entry{ + Timestamp: time.Unix(1, 0), // Ignored in test execution below + Line: "{\"" + entryKey + "\":\"test line 1\"}", + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -329,7 +361,12 @@ func Test_wrapStage_Run(t *testing.T) { // so there is no reason to verify it assert.Equal(t, tt.expectedEntry.Labels, out[0].Labels) assert.Equal(t, tt.expectedEntry.Line, out[0].Line) - assert.Equal(t, tt.expectedEntry.Timestamp, out[0].Timestamp) + if *tt.config.IngestTimestamp { + assert.True(t, out[0].Timestamp.After(tt.inputEntry.Timestamp)) + } else { + assert.Equal(t, tt.expectedEntry.Timestamp, out[0].Timestamp) + } + }) } } From 08a8624c2d4686fa4d6c7dbc64c0f87721538edf Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 13:28:32 -0500 Subject: [PATCH 05/13] update docs --- .../sources/clients/promtail/stages/_index.md | 1 + docs/sources/clients/promtail/stages/wrap.md | 82 +++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 docs/sources/clients/promtail/stages/wrap.md diff --git a/docs/sources/clients/promtail/stages/_index.md b/docs/sources/clients/promtail/stages/_index.md index 1ee91611b13b1..b47208ed6662a 100644 --- a/docs/sources/clients/promtail/stages/_index.md +++ b/docs/sources/clients/promtail/stages/_index.md @@ -17,6 +17,7 @@ Parsing stages: Transform stages: - [template](template/): Use Go templates to modify extracted data. + - [wrap](wrap/): Wraps a log line in a JSON object allowing extracted values and labels to be placed inside the log line. Action stages: diff --git a/docs/sources/clients/promtail/stages/wrap.md b/docs/sources/clients/promtail/stages/wrap.md new file mode 100644 index 0000000000000..40832cf6848d8 --- /dev/null +++ b/docs/sources/clients/promtail/stages/wrap.md @@ -0,0 +1,82 @@ +--- +title: wrap +--- +# `wrap` stage + +The `wrap` stage is a transform stage which lets you embed extracted values and labels into the log line by wrapping the log line in a JSON object. + +For example, if you wanted to remove the labels `container` and `pod` but still wanted to keep their values you could use this stage to create the following output: + +```json +{ + "container": "myapp", + "pod": "pod-3223f", + "_entry": "original log message" +} +``` + +This stage is useful if you have some label or other metadata you would like to keep but it doesn't make a good label (isn't useful for querying or is too high cardinality) + +The querying capabilities of Loki make it easy to still access this data and filter/aggregate on it at query time. + +## Wrap stage schema + +```yaml +wrap: + # Name from extracted data and/or line labels + # Labels provided here are automatically removed from the output labels. + labels: + - [] + + # If the resulting log line should use any existing timestamp or use time.Now() when the line was processed. + # To avoid out of order issues with Loki, when combining several log streams (separate source files) into one + # you will want to set a new timestamp on the log line, `ingest_timestamp: true` + # If you are not combining multiple source files or you know your log lines won't have interlaced timestamps + # you can set this value to false. + [ingest_timestamp: | default = true] +``` + +## Examples + +Removing the container label and embed it into the log line (Kubernetes pods could have multiple containers) + +```yaml +wrap: + labels: + - container +``` + +This would create a log line + +```json +{ + "container": "myapp", + "_entry": "original log message" +} +``` + +Loki 2.0 has some tools to make querying wrapped log lines easier as well. + +Display the log line as if it were never wrapped: + +``` +{cluster="us-central1", job="myjob"} | json | line_format "{{._entry}}" +``` + +Use the wrapped labels for filtering: + +``` +{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" +``` + +You can even use the `json` parser twice if your original message was json: + +``` +{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | json | val_from_original_log_json="foo" +``` + +Or any other parser + +``` +{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | logfmt | val_from_original_log_json="foo" +``` From af9bebede37c80861e8bc5b3ee800ffc6b306d4f Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 13:29:17 -0500 Subject: [PATCH 06/13] update docs --- docs/sources/clients/promtail/stages/wrap.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sources/clients/promtail/stages/wrap.md b/docs/sources/clients/promtail/stages/wrap.md index 40832cf6848d8..c7cbe5ef133a9 100644 --- a/docs/sources/clients/promtail/stages/wrap.md +++ b/docs/sources/clients/promtail/stages/wrap.md @@ -15,6 +15,8 @@ For example, if you wanted to remove the labels `container` and `pod` but still } ``` +The original message will be stored under the `_entry` key + This stage is useful if you have some label or other metadata you would like to keep but it doesn't make a good label (isn't useful for querying or is too high cardinality) The querying capabilities of Loki make it easy to still access this data and filter/aggregate on it at query time. From 90d209dc50325a1a15822f1e35aa11194121f7af Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 13:29:28 -0500 Subject: [PATCH 07/13] update docs --- docs/sources/clients/promtail/stages/wrap.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/clients/promtail/stages/wrap.md b/docs/sources/clients/promtail/stages/wrap.md index c7cbe5ef133a9..95df4b2f7541a 100644 --- a/docs/sources/clients/promtail/stages/wrap.md +++ b/docs/sources/clients/promtail/stages/wrap.md @@ -15,7 +15,7 @@ For example, if you wanted to remove the labels `container` and `pod` but still } ``` -The original message will be stored under the `_entry` key +The original message will be stored under the `_entry` key. This stage is useful if you have some label or other metadata you would like to keep but it doesn't make a good label (isn't useful for querying or is too high cardinality) From 3092e8baab1e410d6b99b64ebfd306388b7aa071 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 13:50:22 -0500 Subject: [PATCH 08/13] lint --- pkg/logentry/stages/wrap.go | 13 +++++++------ pkg/logentry/stages/wrap_test.go | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/logentry/stages/wrap.go b/pkg/logentry/stages/wrap.go index fcd97c747842a..0b0c7c371973c 100644 --- a/pkg/logentry/stages/wrap.go +++ b/pkg/logentry/stages/wrap.go @@ -58,10 +58,10 @@ func (w *Wrapped) UnmarshalJSON(data []byte) error { } // MarshalJSON creates a Wrapped struct as JSON where the Labels are flattened into the top level of the object -func (p Wrapped) MarshalJSON() ([]byte, error) { +func (w Wrapped) MarshalJSON() ([]byte, error) { // Marshal the entry to properly escape if it's json or contains quotes - b, err := json.Marshal(p.Entry) + b, err := json.Marshal(w.Entry) if err != nil { return nil, err } @@ -69,8 +69,8 @@ func (p Wrapped) MarshalJSON() ([]byte, error) { // Creating a map and marshalling from a map results in a non deterministic ordering of the resulting json object // This is functionally ok but really annoying to humans and automated tests. // Instead we will build the json ourselves after sorting all the labels to get a consistent output - keys := make([]string, 0, len(p.Labels)) - for k := range p.Labels { + keys := make([]string, 0, len(w.Labels)) + for k := range w.Labels { keys = append(keys, k) } sort.Strings(keys) @@ -90,7 +90,7 @@ func (p Wrapped) MarshalJSON() ([]byte, error) { buf.Write(key) buf.WriteString(":") // marshal value - val, err := json.Marshal(p.Labels[k]) + val, err := json.Marshal(w.Labels[k]) if err != nil { return nil, err } @@ -114,6 +114,7 @@ type WrapConfig struct { IngestTimestamp *bool `mapstructure:"ingest_timestamp"` } +//nolint:unparam // Always returns nil until someone adds more validation and can remove this. // validateWrapConfig validates the WrapConfig for the wrapStage func validateWrapConfig(cfg *WrapConfig) error { // Default the IngestTimestamp value to be true @@ -192,7 +193,7 @@ func (m *wrapStage) wrap(e Entry) Entry { wl, err := json.Marshal(w) if err != nil { if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("wrap stage failed to marshal wrapped object to json, wrapping will be skipped"), "err", err) + level.Debug(m.logger).Log("msg", "wrap stage failed to marshal wrapped object to json, wrapping will be skipped", "err", err) } return e } diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index 3816b6eb4fd53..ab61addd711cd 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -40,7 +40,7 @@ pipeline_stages: // TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestWrapPipeline(t *testing.T) { registry := prometheus.NewRegistry() - plName := "test_pipeline" + plName := "test_pipeline_deal_with_it_linter" pl, err := NewPipeline(util_log.Logger, loadConfig(testWrapYaml), &plName, registry) require.NoError(t, err) From ff6190bcf8adb455bf5c8d82a8bac9eef35a4161 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 28 Feb 2021 14:10:19 -0500 Subject: [PATCH 09/13] test flake --- pkg/logentry/stages/wrap_test.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index ab61addd711cd..2909bd1e62962 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -60,31 +60,27 @@ func TestWrapPipeline(t *testing.T) { testTime := time.Now() - out := processEntries(pl, - newEntry(nil, l1Lbls, testMatchLogLineApp1, testTime), - newEntry(nil, l2Lbls, testRegexLogLine, testTime), - ) - - // Both lines should succeed - assert.Len(t, out, 2) + // Submit these both separately to get a deterministic output + out1 := processEntries(pl, newEntry(nil, l1Lbls, testMatchLogLineApp1, testTime))[0] + out2 := processEntries(pl, newEntry(nil, l2Lbls, testRegexLogLine, testTime))[0] // Expected labels should remove the wrapped labels expectedLbls := model.LabelSet{ "namespace": "dev", "cluster": "us-eu-1", } - assert.Equal(t, expectedLbls, out[0].Labels) - assert.Equal(t, expectedLbls, out[1].Labels) + assert.Equal(t, expectedLbls, out1.Labels) + assert.Equal(t, expectedLbls, out2.Labels) // Validate timestamps // Line 1 should use the first matcher and should use the log line timestamp - assert.Equal(t, testTime, out[0].Timestamp) + assert.Equal(t, testTime, out1.Timestamp) // Line 2 should use the second matcher and should get timestamp by the wrap stage - assert.True(t, out[1].Timestamp.After(testTime)) + assert.True(t, out2.Timestamp.After(testTime)) // Unmarshal the wrapped object and validate line1 w := &Wrapped{} - assert.NoError(t, json.Unmarshal([]byte(out[0].Entry.Entry.Line), w)) + assert.NoError(t, json.Unmarshal([]byte(out1.Entry.Entry.Line), w)) expectedWrappedLbls := map[string]string{ "pod": "foo-xsfs3", "container": "foo", @@ -94,7 +90,7 @@ func TestWrapPipeline(t *testing.T) { // Validate line 2 w = &Wrapped{} - assert.NoError(t, json.Unmarshal([]byte(out[1].Entry.Entry.Line), w)) + assert.NoError(t, json.Unmarshal([]byte(out2.Entry.Entry.Line), w)) expectedWrappedLbls = map[string]string{ "pod": "foo-vvsdded", "container": "bar", From 0ad6f9f13a383b09cea010583ba5e648ec120ee6 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Mar 2021 14:57:38 -0500 Subject: [PATCH 10/13] rename wrap to pack --- docs/sources/clients/promtail/stages/wrap.md | 18 +++--- pkg/logentry/stages/stage.go | 6 +- pkg/logentry/stages/wrap.go | 60 ++++++++++---------- pkg/logentry/stages/wrap_test.go | 50 ++++++++-------- 4 files changed, 67 insertions(+), 67 deletions(-) diff --git a/docs/sources/clients/promtail/stages/wrap.md b/docs/sources/clients/promtail/stages/wrap.md index 95df4b2f7541a..c815542eb0bca 100644 --- a/docs/sources/clients/promtail/stages/wrap.md +++ b/docs/sources/clients/promtail/stages/wrap.md @@ -1,9 +1,9 @@ --- -title: wrap +title: pack --- -# `wrap` stage +# `pack` stage -The `wrap` stage is a transform stage which lets you embed extracted values and labels into the log line by wrapping the log line in a JSON object. +The `pack` stage is a transform stage which lets you embed extracted values and labels into the log line by packing the log line in a JSON object. For example, if you wanted to remove the labels `container` and `pod` but still wanted to keep their values you could use this stage to create the following output: @@ -21,10 +21,10 @@ This stage is useful if you have some label or other metadata you would like to The querying capabilities of Loki make it easy to still access this data and filter/aggregate on it at query time. -## Wrap stage schema +## Pack stage schema ```yaml -wrap: +pack: # Name from extracted data and/or line labels # Labels provided here are automatically removed from the output labels. labels: @@ -43,7 +43,7 @@ wrap: Removing the container label and embed it into the log line (Kubernetes pods could have multiple containers) ```yaml -wrap: +pack: labels: - container ``` @@ -57,15 +57,15 @@ This would create a log line } ``` -Loki 2.0 has some tools to make querying wrapped log lines easier as well. +Loki 2.0 has some tools to make querying packed log lines easier as well. -Display the log line as if it were never wrapped: +Display the log line as if it were never packed: ``` {cluster="us-central1", job="myjob"} | json | line_format "{{._entry}}" ``` -Use the wrapped labels for filtering: +Use the packed labels for filtering: ``` {cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index e06aeaf5ded87..c3e77f066560d 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -28,7 +28,7 @@ const ( StageTypeTenant = "tenant" StageTypeDrop = "drop" StageTypeMultiline = "multiline" - StageTypeWrap = "wrap" + StageTypePack = "pack" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -146,8 +146,8 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } - case StageTypeWrap: - s, err = newWrapStage(logger, cfg, registerer) + case StageTypePack: + s, err = newPackStage(logger, cfg, registerer) if err != nil { return nil, err } diff --git a/pkg/logentry/stages/wrap.go b/pkg/logentry/stages/wrap.go index 0b0c7c371973c..dd76aed77fafc 100644 --- a/pkg/logentry/stages/wrap.go +++ b/pkg/logentry/stages/wrap.go @@ -25,13 +25,13 @@ var ( reallyFalse = false ) -type Wrapped struct { +type Packed struct { Labels map[string]string `json:",inline"` Entry string `json:"_entry"` } -// UnmarshalJSON populates a Wrapped struct where every key except the _entry key is added to the Labels field -func (w *Wrapped) UnmarshalJSON(data []byte) error { +// UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field +func (w *Packed) UnmarshalJSON(data []byte) error { m := &map[string]interface{}{} err := json.Unmarshal(data, m) if err != nil { @@ -57,8 +57,8 @@ func (w *Wrapped) UnmarshalJSON(data []byte) error { return nil } -// MarshalJSON creates a Wrapped struct as JSON where the Labels are flattened into the top level of the object -func (w Wrapped) MarshalJSON() ([]byte, error) { +// MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object +func (w Packed) MarshalJSON() ([]byte, error) { // Marshal the entry to properly escape if it's json or contains quotes b, err := json.Marshal(w.Entry) @@ -108,15 +108,15 @@ func (w Wrapped) MarshalJSON() ([]byte, error) { return buf.Bytes(), nil } -// WrapConfig contains the configuration for a wrapStage -type WrapConfig struct { +// PackConfig contains the configuration for a packStage +type PackConfig struct { Labels []string `mapstrcuture:"labels"` IngestTimestamp *bool `mapstructure:"ingest_timestamp"` } //nolint:unparam // Always returns nil until someone adds more validation and can remove this. -// validateWrapConfig validates the WrapConfig for the wrapStage -func validateWrapConfig(cfg *WrapConfig) error { +// validatePackConfig validates the PackConfig for the packStage +func validatePackConfig(cfg *PackConfig) error { // Default the IngestTimestamp value to be true if cfg.IngestTimestamp == nil { cfg.IngestTimestamp = &reallyTrue @@ -124,46 +124,46 @@ func validateWrapConfig(cfg *WrapConfig) error { return nil } -// newWrapStage creates a DropStage from config -func newWrapStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { - cfg := &WrapConfig{} +// newPackStage creates a DropStage from config +func newPackStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { + cfg := &PackConfig{} err := mapstructure.WeakDecode(config, cfg) if err != nil { return nil, err } - err = validateWrapConfig(cfg) + err = validatePackConfig(cfg) if err != nil { return nil, err } - return &wrapStage{ - logger: log.With(logger, "component", "stage", "type", "wrap"), + return &packStage{ + logger: log.With(logger, "component", "stage", "type", "pack"), cfg: cfg, dropCount: getDropCountMetric(registerer), }, nil } -// wrapStage applies Label matchers to determine if the include stages should be run -type wrapStage struct { +// packStage applies Label matchers to determine if the include stages should be run +type packStage struct { logger log.Logger - cfg *WrapConfig + cfg *PackConfig dropCount *prometheus.CounterVec } -func (m *wrapStage) Run(in chan Entry) chan Entry { +func (m *packStage) Run(in chan Entry) chan Entry { out := make(chan Entry) go func() { defer close(out) for e := range in { - out <- m.wrap(e) + out <- m.pack(e) } }() return out } -func (m *wrapStage) wrap(e Entry) Entry { +func (m *packStage) pack(e Entry) Entry { lbls := e.Labels - wrappedLabels := make(map[string]string, len(m.cfg.Labels)) + packedLabels := make(map[string]string, len(m.cfg.Labels)) foundLables := []model.LabelName{} // Iterate through all the extracted map (which also includes all the labels) @@ -173,19 +173,19 @@ func (m *wrapStage) wrap(e Entry) Entry { sv, err := getString(lv) if err != nil { if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("value for key: '%s' cannot be converted to a string and cannot be wrapped", lk), "err", err, "type", reflect.TypeOf(lv)) + level.Debug(m.logger).Log("msg", fmt.Sprintf("value for key: '%s' cannot be converted to a string and cannot be packed", lk), "err", err, "type", reflect.TypeOf(lv)) } continue } - wrappedLabels[wl] = sv + packedLabels[wl] = sv foundLables = append(foundLables, model.LabelName(lk)) } } } // Embed the extracted labels into the wrapper object - w := Wrapped{ - Labels: wrappedLabels, + w := Packed{ + Labels: packedLabels, Entry: e.Line, } @@ -193,13 +193,13 @@ func (m *wrapStage) wrap(e Entry) Entry { wl, err := json.Marshal(w) if err != nil { if Debug { - level.Debug(m.logger).Log("msg", "wrap stage failed to marshal wrapped object to json, wrapping will be skipped", "err", err) + level.Debug(m.logger).Log("msg", "pack stage failed to marshal packed object to json, packing will be skipped", "err", err) } return e } // Remove anything found which is also a label, do this after the marshalling to not remove labels until - // we are sure the line can be successfully wrapped. + // we are sure the line can be successfully packed. for _, fl := range foundLables { delete(lbls, fl) } @@ -217,6 +217,6 @@ func (m *wrapStage) wrap(e Entry) Entry { } // Name implements Stage -func (m *wrapStage) Name() string { - return StageTypeWrap +func (m *packStage) Name() string { + return StageTypePack } diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/wrap_test.go index 2909bd1e62962..cdfc5195d7ce1 100644 --- a/pkg/logentry/stages/wrap_test.go +++ b/pkg/logentry/stages/wrap_test.go @@ -17,12 +17,12 @@ import ( ) // Not all these are tested but are here to make sure the different types marshal without error -var testWrapYaml = ` +var testPackYaml = ` pipeline_stages: - match: selector: "{container=\"foo\"}" stages: - - wrap: + - pack: labels: - pod - container @@ -30,7 +30,7 @@ pipeline_stages: - match: selector: "{container=\"bar\"}" stages: - - wrap: + - pack: labels: - pod - container @@ -38,10 +38,10 @@ pipeline_stages: ` // TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline -func TestWrapPipeline(t *testing.T) { +func TestPackPipeline(t *testing.T) { registry := prometheus.NewRegistry() plName := "test_pipeline_deal_with_it_linter" - pl, err := NewPipeline(util_log.Logger, loadConfig(testWrapYaml), &plName, registry) + pl, err := NewPipeline(util_log.Logger, loadConfig(testPackYaml), &plName, registry) require.NoError(t, err) l1Lbls := model.LabelSet{ @@ -64,7 +64,7 @@ func TestWrapPipeline(t *testing.T) { out1 := processEntries(pl, newEntry(nil, l1Lbls, testMatchLogLineApp1, testTime))[0] out2 := processEntries(pl, newEntry(nil, l2Lbls, testRegexLogLine, testTime))[0] - // Expected labels should remove the wrapped labels + // Expected labels should remove the packed labels expectedLbls := model.LabelSet{ "namespace": "dev", "cluster": "us-eu-1", @@ -75,31 +75,31 @@ func TestWrapPipeline(t *testing.T) { // Validate timestamps // Line 1 should use the first matcher and should use the log line timestamp assert.Equal(t, testTime, out1.Timestamp) - // Line 2 should use the second matcher and should get timestamp by the wrap stage + // Line 2 should use the second matcher and should get timestamp by the pack stage assert.True(t, out2.Timestamp.After(testTime)) - // Unmarshal the wrapped object and validate line1 - w := &Wrapped{} + // Unmarshal the packed object and validate line1 + w := &Packed{} assert.NoError(t, json.Unmarshal([]byte(out1.Entry.Entry.Line), w)) - expectedWrappedLbls := map[string]string{ + expectedPackedLabels := map[string]string{ "pod": "foo-xsfs3", "container": "foo", } - assert.Equal(t, expectedWrappedLbls, w.Labels) + assert.Equal(t, expectedPackedLabels, w.Labels) assert.Equal(t, testMatchLogLineApp1, w.Entry) // Validate line 2 - w = &Wrapped{} + w = &Packed{} assert.NoError(t, json.Unmarshal([]byte(out2.Entry.Entry.Line), w)) - expectedWrappedLbls = map[string]string{ + expectedPackedLabels = map[string]string{ "pod": "foo-vvsdded", "container": "bar", } - assert.Equal(t, expectedWrappedLbls, w.Labels) + assert.Equal(t, expectedPackedLabels, w.Labels) assert.Equal(t, testRegexLogLine, w.Entry) } -func Test_wrapStage_Run(t *testing.T) { +func Test_packStage_Run(t *testing.T) { // Enable debug logging cfg := &ww.Config{} require.Nil(t, cfg.LogLevel.Set("debug")) @@ -108,13 +108,13 @@ func Test_wrapStage_Run(t *testing.T) { tests := []struct { name string - config *WrapConfig + config *PackConfig inputEntry Entry expectedEntry Entry }{ { name: "no supplied labels list", - config: &WrapConfig{ + config: &PackConfig{ Labels: nil, IngestTimestamp: &reallyFalse, }, @@ -146,7 +146,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "match one supplied label", - config: &WrapConfig{ + config: &PackConfig{ Labels: []string{"foo"}, IngestTimestamp: &reallyFalse, }, @@ -177,7 +177,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "match all supplied labels", - config: &WrapConfig{ + config: &PackConfig{ Labels: []string{"foo", "bar"}, IngestTimestamp: &reallyFalse, }, @@ -206,7 +206,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "match extracted map and labels", - config: &WrapConfig{ + config: &PackConfig{ Labels: []string{"foo", "extr1"}, IngestTimestamp: &reallyFalse, }, @@ -240,7 +240,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "extracted map value not convertable to a string", - config: &WrapConfig{ + config: &PackConfig{ Labels: []string{"foo", "extr2"}, IngestTimestamp: &reallyFalse, }, @@ -274,7 +274,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "escape quotes", - config: &WrapConfig{ + config: &PackConfig{ Labels: []string{"foo", "ex\"tr2"}, IngestTimestamp: &reallyFalse, }, @@ -308,7 +308,7 @@ func Test_wrapStage_Run(t *testing.T) { }, { name: "ingest timestamp", - config: &WrapConfig{ + config: &PackConfig{ Labels: nil, IngestTimestamp: &reallyTrue, }, @@ -341,11 +341,11 @@ func Test_wrapStage_Run(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := validateWrapConfig(tt.config) + err := validatePackConfig(tt.config) if err != nil { t.Error(err) } - m, err := newWrapStage(util_log.Logger, tt.config, prometheus.DefaultRegisterer) + m, err := newPackStage(util_log.Logger, tt.config, prometheus.DefaultRegisterer) require.NoError(t, err) // Normal pipeline operation will put all the labels into the extracted map // replicate that here. From 4ee035d69cd2970172e17693f531ed1c903f27d5 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Mar 2021 14:59:00 -0500 Subject: [PATCH 11/13] fix docs --- docs/sources/clients/promtail/stages/_index.md | 2 +- docs/sources/clients/promtail/stages/{wrap.md => pack.md} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename docs/sources/clients/promtail/stages/{wrap.md => pack.md} (98%) diff --git a/docs/sources/clients/promtail/stages/_index.md b/docs/sources/clients/promtail/stages/_index.md index b47208ed6662a..5b5e2b833a6db 100644 --- a/docs/sources/clients/promtail/stages/_index.md +++ b/docs/sources/clients/promtail/stages/_index.md @@ -17,7 +17,7 @@ Parsing stages: Transform stages: - [template](template/): Use Go templates to modify extracted data. - - [wrap](wrap/): Wraps a log line in a JSON object allowing extracted values and labels to be placed inside the log line. + - [pack](pack/): Packs a log line in a JSON object allowing extracted values and labels to be placed inside the log line. Action stages: diff --git a/docs/sources/clients/promtail/stages/wrap.md b/docs/sources/clients/promtail/stages/pack.md similarity index 98% rename from docs/sources/clients/promtail/stages/wrap.md rename to docs/sources/clients/promtail/stages/pack.md index c815542eb0bca..af767603c43d3 100644 --- a/docs/sources/clients/promtail/stages/wrap.md +++ b/docs/sources/clients/promtail/stages/pack.md @@ -3,7 +3,7 @@ title: pack --- # `pack` stage -The `pack` stage is a transform stage which lets you embed extracted values and labels into the log line by packing the log line in a JSON object. +The `pack` stage is a transform stage which lets you embed extracted values and labels into the log line by packing the log line and labels inside a JSON object. For example, if you wanted to remove the labels `container` and `pod` but still wanted to keep their values you could use this stage to create the following output: From 48e93129f3acc0fc188b212228fa816cf29dcbab Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Mar 2021 14:59:43 -0500 Subject: [PATCH 12/13] rename files --- pkg/logentry/stages/{wrap.go => pack.go} | 0 pkg/logentry/stages/{wrap_test.go => pack_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/logentry/stages/{wrap.go => pack.go} (100%) rename pkg/logentry/stages/{wrap_test.go => pack_test.go} (100%) diff --git a/pkg/logentry/stages/wrap.go b/pkg/logentry/stages/pack.go similarity index 100% rename from pkg/logentry/stages/wrap.go rename to pkg/logentry/stages/pack.go diff --git a/pkg/logentry/stages/wrap_test.go b/pkg/logentry/stages/pack_test.go similarity index 100% rename from pkg/logentry/stages/wrap_test.go rename to pkg/logentry/stages/pack_test.go From 7f9b5a13e08fd71888e2ad905081c40373f466cd Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 1 Mar 2021 15:30:12 -0500 Subject: [PATCH 13/13] typo --- pkg/logentry/stages/pack.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/logentry/stages/pack.go b/pkg/logentry/stages/pack.go index dd76aed77fafc..52aee9254a080 100644 --- a/pkg/logentry/stages/pack.go +++ b/pkg/logentry/stages/pack.go @@ -164,7 +164,7 @@ func (m *packStage) Run(in chan Entry) chan Entry { func (m *packStage) pack(e Entry) Entry { lbls := e.Labels packedLabels := make(map[string]string, len(m.cfg.Labels)) - foundLables := []model.LabelName{} + foundLabels := []model.LabelName{} // Iterate through all the extracted map (which also includes all the labels) for lk, lv := range e.Extracted { @@ -178,7 +178,7 @@ func (m *packStage) pack(e Entry) Entry { continue } packedLabels[wl] = sv - foundLables = append(foundLables, model.LabelName(lk)) + foundLabels = append(foundLabels, model.LabelName(lk)) } } } @@ -200,7 +200,7 @@ func (m *packStage) pack(e Entry) Entry { // Remove anything found which is also a label, do this after the marshalling to not remove labels until // we are sure the line can be successfully packed. - for _, fl := range foundLables { + for _, fl := range foundLabels { delete(lbls, fl) }