diff --git a/entry/entry.go b/entry/entry.go index 7f4d9849..f10d7b56 100644 --- a/entry/entry.go +++ b/entry/entry.go @@ -48,6 +48,9 @@ type Entry struct { Attributes map[string]string `json:"attributes,omitempty" yaml:"attributes,omitempty"` Resource map[string]string `json:"resource,omitempty" yaml:"resource,omitempty"` Record interface{} `json:"record" yaml:"record"` + TraceId []byte `json:"trace_id" yaml:"trace_id"` + SpanId []byte `json:"span_id" yaml:"span_id"` + TraceFlags []byte `json:"trace_flags" yaml:"trace_flags"` } // New will create a new log entry with current timestamp and an empty record. @@ -196,5 +199,8 @@ func (entry *Entry) Copy() *Entry { Attributes: copyStringMap(entry.Attributes), Resource: copyStringMap(entry.Resource), Record: copyValue(entry.Record), + TraceId: entry.TraceId, + SpanId: entry.SpanId, + TraceFlags: entry.TraceFlags, } } diff --git a/entry/entry_test.go b/entry/entry_test.go index 21a15a80..9e1965cf 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -141,6 +141,9 @@ func TestCopy(t *testing.T) { entry.Record = "test" entry.Attributes = map[string]string{"label": "value"} entry.Resource = map[string]string{"resource": "value"} + entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f} + entry.SpanId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} + entry.TraceFlags = []byte{0x01} copy := entry.Copy() entry.Severity = Severity(1) @@ -149,6 +152,9 @@ func TestCopy(t *testing.T) { entry.Record = "new" entry.Attributes = map[string]string{"label": "new value"} entry.Resource = map[string]string{"resource": "new value"} + entry.TraceId = []byte{0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} + entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03} + entry.TraceFlags = []byte{0x00} require.Equal(t, time.Time{}, copy.Timestamp) require.Equal(t, Severity(0), copy.Severity) @@ -156,6 +162,9 @@ func TestCopy(t *testing.T) { require.Equal(t, map[string]string{"label": "value"}, copy.Attributes) require.Equal(t, map[string]string{"resource": "value"}, copy.Resource) require.Equal(t, "test", copy.Record) + require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, copy.TraceId) + require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, copy.SpanId) + require.Equal(t, []byte{0x01}, copy.TraceFlags) } func TestFieldFromString(t *testing.T) { diff --git a/operator/builtin/output/stdout/stdout_test.go b/operator/builtin/output/stdout/stdout_test.go index 5aa83535..cf0f1490 100644 --- a/operator/builtin/output/stdout/stdout_test.go +++ b/operator/builtin/output/stdout/stdout_test.go @@ -56,6 +56,6 @@ func TestStdoutOperator(t *testing.T) { marshalledTimestamp, err := json.Marshal(ts) require.NoError(t, err) - expected := `{"timestamp":` + string(marshalledTimestamp) + `,"severity":0,"record":"test record"}` + "\n" + expected := `{"timestamp":` + string(marshalledTimestamp) + `,"severity":0,"record":"test record","trace_id":null,"span_id":null,"trace_flags":null}` + "\n" require.Equal(t, expected, buf.String()) } diff --git a/operator/builtin/parser/trace/trace.go b/operator/builtin/parser/trace/trace.go new file mode 100644 index 00000000..0464054e --- /dev/null +++ b/operator/builtin/parser/trace/trace.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" +) + +func init() { + operator.Register("trace_parser", func() operator.Builder { return NewTraceParserConfig("") }) +} + +// NewTraceParserConfig creates a new trace parser config with default values +func NewTraceParserConfig(operatorID string) *TraceParserConfig { + return &TraceParserConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "trace_parser"), + TraceParser: helper.NewTraceParser(), + } +} + +// SeverityParserConfig is the configuration of a severity parser operator. +type TraceParserConfig struct { + helper.TransformerConfig `mapstructure:",squash" yaml:",inline"` + helper.TraceParser `mapstructure:",omitempty,squash" yaml:",omitempty,inline"` +} + +// Build will build a time parser operator. +func (c TraceParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, err + } + + if err := c.TraceParser.Validate(context); err != nil { + return nil, err + } + + traceOperator := &TraceParserOperator{ + TransformerOperator: transformerOperator, + TraceParser: c.TraceParser, + } + + return []operator.Operator{traceOperator}, nil +} + +// SeverityParserOperator is an operator that parses time from a field to an entry. +type TraceParserOperator struct { + helper.TransformerOperator + helper.TraceParser +} + +// Process will parse time from an entry. +func (p *TraceParserOperator) Process(ctx context.Context, entry *entry.Entry) error { + return p.ProcessWith(ctx, entry, p.Parse) +} diff --git a/operator/builtin/parser/trace/trace_test.go b/operator/builtin/parser/trace/trace_test.go new file mode 100644 index 00000000..2044da96 --- /dev/null +++ b/operator/builtin/parser/trace/trace_test.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "encoding/hex" + "testing" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" + "github.com/stretchr/testify/require" +) + +func TestDefaultParser(t *testing.T) { + traceParserConfig := NewTraceParserConfig("") + _, err := traceParserConfig.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) +} + +func TestTraceParserParse(t *testing.T) { + cases := []struct { + name string + inputRecord map[string]interface{} + expectedRecord map[string]interface{} + expectErr bool + traceId string + spanId string + traceFlags string + }{ + { + "AllFields", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "01", + }, + { + "WrongFields", + map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "traceFlags": "01", + "spanId": "92c3792d54ba94f3", + }, + map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "spanId": "92c3792d54ba94f3", + "traceFlags": "01", + }, + false, + "", + "", + "", + }, + { + "OnlyTraceId", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "", + "", + }, + { + "WrongTraceIdFormat", + map[string]interface{}{ + "trace_id": "foo_bar", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + true, + "", + "92c3792d54ba94f3", + "01", + }, + { + "WrongTraceFlagFormat", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "foo_bar", + }, + map[string]interface{}{}, + true, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "", + }, + { + "AllFields", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "01", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + traceParserConfig := NewTraceParserConfig("") + _, _ = traceParserConfig.Build(testutil.NewBuildContext(t)) + e := entry.New() + e.Record = tc.inputRecord + err := traceParserConfig.Parse(e) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.expectedRecord, e.Record) + traceId, _ := hex.DecodeString(tc.traceId) + if len(tc.traceId) == 0 { + traceId = nil + } + spanId, _ := hex.DecodeString(tc.spanId) + if len(tc.spanId) == 0 { + spanId = nil + } + traceFlags, _ := hex.DecodeString(tc.traceFlags) + if len(tc.traceFlags) == 0 { + traceFlags = nil + } + require.Equal(t, traceId, e.TraceId) + require.Equal(t, spanId, e.SpanId) + require.Equal(t, traceFlags, e.TraceFlags) + }) + } +} diff --git a/operator/helper/parser.go b/operator/helper/parser.go index 4b58a0f2..5adcc27b 100644 --- a/operator/helper/parser.go +++ b/operator/helper/parser.go @@ -41,6 +41,7 @@ type ParserConfig struct { PreserveTo *entry.Field `mapstructure:"preserve_to" json:"preserve_to" yaml:"preserve_to"` TimeParser *TimeParser `mapstructure:"timestamp,omitempty" json:"timestamp,omitempty" yaml:"timestamp,omitempty"` SeverityParserConfig *SeverityParserConfig `mapstructure:"severity,omitempty" json:"severity,omitempty" yaml:"severity,omitempty"` + TraceParser *TraceParser `mapstructure:"trace,omitempty" json:"trace,omitempty" yaml:"trace,omitempty"` } // Build will build a parser operator. @@ -72,6 +73,13 @@ func (c ParserConfig) Build(context operator.BuildContext) (ParserOperator, erro parserOperator.SeverityParser = &severityParser } + if c.TraceParser != nil { + if err := c.TraceParser.Validate(context); err != nil { + return ParserOperator{}, err + } + parserOperator.TraceParser = c.TraceParser + } + return parserOperator, nil } @@ -83,6 +91,7 @@ type ParserOperator struct { PreserveTo *entry.Field TimeParser *TimeParser SeverityParser *SeverityParser + TraceParser *TraceParser } // ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators. @@ -154,6 +163,10 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars severityParseErr = p.SeverityParser.Parse(entry) } + if p.TraceParser != nil { + _ = p.TraceParser.Parse(entry) + } + // Handle time or severity parsing errors after attempting to parse both if timeParseErr != nil { return p.HandleEntryError(ctx, entry, errors.Wrap(timeParseErr, "time parser")) diff --git a/operator/helper/trace.go b/operator/helper/trace.go new file mode 100644 index 00000000..501541a5 --- /dev/null +++ b/operator/helper/trace.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "encoding/hex" + "fmt" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/errors" + "github.com/open-telemetry/opentelemetry-log-collection/operator" +) + +// NewTraceParser creates a new trace parser with default values +func NewTraceParser() TraceParser { + traceId := entry.NewRecordField("trace_id") + spanId := entry.NewRecordField("span_id") + traceFlags := entry.NewRecordField("trace_flags") + return TraceParser{ + TraceId: &TraceIdConfig{ + ParseFrom: &traceId, + }, + SpanId: &SpanIdConfig{ + ParseFrom: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + ParseFrom: &traceFlags, + }, + } +} + +// TraceParser is a helper that parses trace spans (and flags) onto an entry. +type TraceParser struct { + TraceId *TraceIdConfig `mapstructure:"trace_id,omitempty" json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId *SpanIdConfig `mapstructure:"span_id,omitempty" json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags *TraceFlagsConfig `mapstructure:"trace_flags,omitempty" json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` +} + +type TraceIdConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +type SpanIdConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +type TraceFlagsConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +// Validate validates a TraceParser, and reconfigures it if necessary +func (t *TraceParser) Validate(context operator.BuildContext) error { + if t.TraceId == nil { + t.TraceId = &TraceIdConfig{} + } + if t.TraceId.ParseFrom == nil { + field := entry.NewRecordField("trace_id") + t.TraceId.ParseFrom = &field + } + if t.SpanId == nil { + t.SpanId = &SpanIdConfig{} + } + if t.SpanId.ParseFrom == nil { + field := entry.NewRecordField("span_id") + t.SpanId.ParseFrom = &field + } + if t.TraceFlags == nil { + t.TraceFlags = &TraceFlagsConfig{} + } + if t.TraceFlags.ParseFrom == nil { + field := entry.NewRecordField("trace_flags") + t.TraceFlags.ParseFrom = &field + } + return nil +} + +// Best effort hex parsing for trace, spans and flags +func parseHexField(entry *entry.Entry, field *entry.Field, to *entry.Field) ([]byte, error) { + value, ok := entry.Delete(field) + if !ok { + return nil, nil + } + + data, err := hex.DecodeString(fmt.Sprintf("%v", value)) + if err != nil { + return nil, err + } + + if to != nil { + err = entry.Set(to, value) + return data, err + } + return data, nil +} + +// Parse will parse a trace (trace_id, span_id and flags) from a field and attach it to the entry +func (t *TraceParser) Parse(entry *entry.Entry) error { + var errTraceId, errSpanId, errTraceFlags error + entry.TraceId, errTraceId = parseHexField(entry, t.TraceId.ParseFrom, t.TraceId.PreserveTo) + entry.SpanId, errSpanId = parseHexField(entry, t.SpanId.ParseFrom, t.SpanId.PreserveTo) + entry.TraceFlags, errTraceFlags = parseHexField(entry, t.TraceFlags.ParseFrom, t.TraceFlags.PreserveTo) + if errTraceId != nil || errTraceFlags != nil || errSpanId != nil { + err := errors.NewError("Error decoding traces for logs", "") + if errTraceId != nil { + _ = err.WithDetails("trace_id", errTraceId.Error()) + } + if errSpanId != nil { + _ = err.WithDetails("span_id", errSpanId.Error()) + } + if errTraceFlags != nil { + _ = err.WithDetails("trace_flags", errTraceFlags.Error()) + } + return err + } + return nil +} diff --git a/operator/helper/trace_test.go b/operator/helper/trace_test.go new file mode 100644 index 00000000..7a2f377f --- /dev/null +++ b/operator/helper/trace_test.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "encoding/hex" + "testing" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" + "github.com/stretchr/testify/require" +) + +func TestValidateDoesntChangeFields(t *testing.T) { + traceId := entry.NewRecordField("traceId") + spanId := entry.NewRecordField("spanId") + traceFlags := entry.NewRecordField("traceFlags") + parser := TraceParser{ + TraceId: &TraceIdConfig{ + ParseFrom: &traceId, + }, + SpanId: &SpanIdConfig{ + ParseFrom: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + ParseFrom: &traceFlags, + }, + } + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + require.Equal(t, &traceId, parser.TraceId.ParseFrom) + require.Equal(t, &spanId, parser.SpanId.ParseFrom) + require.Equal(t, &traceFlags, parser.TraceFlags.ParseFrom) +} + +func TestValidateSetsDefaultFields(t *testing.T) { + traceId := entry.NewRecordField("trace_id") + spanId := entry.NewRecordField("span_id") + traceFlags := entry.NewRecordField("trace_flags") + parser := TraceParser{} + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + require.Equal(t, &traceId, parser.TraceId.ParseFrom) + require.Equal(t, &spanId, parser.SpanId.ParseFrom) + require.Equal(t, &traceFlags, parser.TraceFlags.ParseFrom) +} + +func TestPreserveFields(t *testing.T) { + traceId := entry.NewRecordField("traceId") + spanId := entry.NewRecordField("spanId") + traceFlags := entry.NewRecordField("traceFlags") + parser := TraceParser{ + TraceId: &TraceIdConfig{ + PreserveTo: &traceId, + }, + SpanId: &SpanIdConfig{ + PreserveTo: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + PreserveTo: &traceFlags, + }, + } + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + + entry := entry.New() + entry.Record = map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + } + err = parser.Parse(entry) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "spanId": "92c3792d54ba94f3", + "traceFlags": "01", + }, entry.Record) + + value, _ := hex.DecodeString("480140f3d770a5ae32f0a22b6a812cff") + require.Equal(t, value, entry.TraceId) + value, _ = hex.DecodeString("92c3792d54ba94f3") + require.Equal(t, value, entry.SpanId) + value, _ = hex.DecodeString("01") + require.Equal(t, value, entry.TraceFlags) +}