diff --git a/entry/entry.go b/entry/entry.go index 1c402c4d..7c53656a 100644 --- a/entry/entry.go +++ b/entry/entry.go @@ -48,6 +48,9 @@ type Entry struct { Attributes map[string]string `json:"attributes,omitempty" yaml:"attributes,omitempty"` Resource map[string]string `json:"resource,omitempty" yaml:"resource,omitempty"` Body interface{} `json:"body" yaml:"body"` + TraceId []byte `json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId []byte `json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags []byte `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` } // New will create a new log entry with current timestamp and an empty body. @@ -196,5 +199,8 @@ func (entry *Entry) Copy() *Entry { Attributes: copyStringMap(entry.Attributes), Resource: copyStringMap(entry.Resource), Body: copyValue(entry.Body), + TraceId: copyByteArray(entry.TraceId), + SpanId: copyByteArray(entry.SpanId), + TraceFlags: copyByteArray(entry.TraceFlags), } } diff --git a/entry/entry_test.go b/entry/entry_test.go index c9f7a799..ff0b60b8 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -141,6 +141,9 @@ func TestCopy(t *testing.T) { entry.Body = "test" entry.Attributes = map[string]string{"label": "value"} entry.Resource = map[string]string{"resource": "value"} + entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f} + entry.SpanId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} + entry.TraceFlags = []byte{0x01} copy := entry.Copy() entry.Severity = Severity(1) @@ -149,6 +152,9 @@ func TestCopy(t *testing.T) { entry.Body = "new" entry.Attributes = map[string]string{"label": "new value"} entry.Resource = map[string]string{"resource": "new value"} + entry.TraceId[0] = 0xff + entry.SpanId[0] = 0xff + entry.TraceFlags[0] = 0xff require.Equal(t, time.Time{}, copy.Timestamp) require.Equal(t, Severity(0), copy.Severity) @@ -156,6 +162,35 @@ func TestCopy(t *testing.T) { require.Equal(t, map[string]string{"label": "value"}, copy.Attributes) require.Equal(t, map[string]string{"resource": "value"}, copy.Resource) require.Equal(t, "test", copy.Body) + require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, copy.TraceId) + require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, copy.SpanId) + require.Equal(t, []byte{0x01}, copy.TraceFlags) +} + +func TestCopyNil(t *testing.T) { + entry := New() + entry.Timestamp = time.Time{} + copy := entry.Copy() + + entry.Severity = Severity(1) + entry.SeverityText = "1" + entry.Timestamp = time.Now() + entry.Body = "new" + entry.Attributes = map[string]string{"label": "new value"} + entry.Resource = map[string]string{"resource": "new value"} + entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f} + entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03} + entry.TraceFlags = []byte{0x01} + + require.Equal(t, time.Time{}, copy.Timestamp) + require.Equal(t, Severity(0), copy.Severity) + require.Equal(t, "", copy.SeverityText) + require.Equal(t, map[string]string{}, copy.Attributes) + require.Equal(t, map[string]string{}, copy.Resource) + require.Equal(t, nil, copy.Body) + require.Equal(t, []byte{}, copy.TraceId) + require.Equal(t, []byte{}, copy.SpanId) + require.Equal(t, []byte{}, copy.TraceFlags) } func TestFieldFromString(t *testing.T) { diff --git a/operator/builtin/parser/trace/trace.go b/operator/builtin/parser/trace/trace.go new file mode 100644 index 00000000..d699f469 --- /dev/null +++ b/operator/builtin/parser/trace/trace.go @@ -0,0 +1,71 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/operator" + "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" +) + +func init() { + operator.Register("trace_parser", func() operator.Builder { return NewTraceParserConfig("") }) +} + +// NewTraceParserConfig creates a new trace parser config with default values +func NewTraceParserConfig(operatorID string) *TraceParserConfig { + return &TraceParserConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "trace_parser"), + TraceParser: helper.NewTraceParser(), + } +} + +// TraceParserConfig is the configuration of a trace parser operator. +type TraceParserConfig struct { + helper.TransformerConfig `mapstructure:",squash" yaml:",inline"` + helper.TraceParser `mapstructure:",omitempty,squash" yaml:",omitempty,inline"` +} + +// Build will build a trace parser operator. +func (c TraceParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, err + } + + if err := c.TraceParser.Validate(context); err != nil { + return nil, err + } + + traceOperator := &TraceParserOperator{ + TransformerOperator: transformerOperator, + TraceParser: c.TraceParser, + } + + return []operator.Operator{traceOperator}, nil +} + +// TraceParserConfig is an operator that parses traces from fields to an entry. +type TraceParserOperator struct { + helper.TransformerOperator + helper.TraceParser +} + +// Process will parse traces from an entry. +func (p *TraceParserOperator) Process(ctx context.Context, entry *entry.Entry) error { + return p.ProcessWith(ctx, entry, p.Parse) +} diff --git a/operator/builtin/parser/trace/trace_test.go b/operator/builtin/parser/trace/trace_test.go new file mode 100644 index 00000000..072f266b --- /dev/null +++ b/operator/builtin/parser/trace/trace_test.go @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func TestDefaultParser(t *testing.T) { + traceParserConfig := NewTraceParserConfig("") + _, err := traceParserConfig.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) +} + +func TestTraceParserParse(t *testing.T) { + cases := []struct { + name string + inputRecord map[string]interface{} + expectedRecord map[string]interface{} + expectErr bool + traceId string + spanId string + traceFlags string + }{ + { + "AllFields", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "01", + }, + { + "WrongFields", + map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "traceFlags": "01", + "spanId": "92c3792d54ba94f3", + }, + map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "spanId": "92c3792d54ba94f3", + "traceFlags": "01", + }, + false, + "", + "", + "", + }, + { + "OnlyTraceId", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "", + "", + }, + { + "WrongTraceIdFormat", + map[string]interface{}{ + "trace_id": "foo_bar", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + true, + "", + "92c3792d54ba94f3", + "01", + }, + { + "WrongTraceFlagFormat", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "foo_bar", + }, + map[string]interface{}{}, + true, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "", + }, + { + "AllFields", + map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + }, + map[string]interface{}{}, + false, + "480140f3d770a5ae32f0a22b6a812cff", + "92c3792d54ba94f3", + "01", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + traceParserConfig := NewTraceParserConfig("") + _, _ = traceParserConfig.Build(testutil.NewBuildContext(t)) + e := entry.New() + e.Body = tc.inputRecord + err := traceParserConfig.Parse(e) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.expectedRecord, e.Body) + traceId, _ := hex.DecodeString(tc.traceId) + if len(tc.traceId) == 0 { + require.Nil(t, e.TraceId) + } else { + require.Equal(t, traceId, e.TraceId) + } + spanId, _ := hex.DecodeString(tc.spanId) + if len(tc.spanId) == 0 { + require.Nil(t, e.SpanId) + } else { + require.Equal(t, spanId, e.SpanId) + } + traceFlags, _ := hex.DecodeString(tc.traceFlags) + if len(tc.traceFlags) == 0 { + require.Nil(t, e.TraceFlags) + } else { + require.Equal(t, traceFlags, e.TraceFlags) + } + }) + } +} diff --git a/operator/builtin/transformer/noop/noop_test.go b/operator/builtin/transformer/noop/noop_test.go index faa86799..4ee87d4e 100644 --- a/operator/builtin/transformer/noop/noop_test.go +++ b/operator/builtin/transformer/noop/noop_test.go @@ -55,6 +55,9 @@ func TestProcess(t *testing.T) { entry := entry.New() entry.AddAttribute("label", "value") entry.AddResourceKey("resource", "value") + entry.TraceId = []byte{0x01} + entry.SpanId = []byte{0x01} + entry.TraceFlags = []byte{0x01} expected := entry.Copy() err = op.Process(context.Background(), entry) diff --git a/operator/helper/parser.go b/operator/helper/parser.go index e8ec347a..4c05c21e 100644 --- a/operator/helper/parser.go +++ b/operator/helper/parser.go @@ -41,6 +41,7 @@ type ParserConfig struct { PreserveTo *entry.Field `mapstructure:"preserve_to" json:"preserve_to" yaml:"preserve_to"` TimeParser *TimeParser `mapstructure:"timestamp,omitempty" json:"timestamp,omitempty" yaml:"timestamp,omitempty"` SeverityParserConfig *SeverityParserConfig `mapstructure:"severity,omitempty" json:"severity,omitempty" yaml:"severity,omitempty"` + TraceParser *TraceParser `mapstructure:"trace,omitempty" json:"trace,omitempty" yaml:"trace,omitempty"` } // Build will build a parser operator. @@ -72,6 +73,13 @@ func (c ParserConfig) Build(context operator.BuildContext) (ParserOperator, erro parserOperator.SeverityParser = &severityParser } + if c.TraceParser != nil { + if err := c.TraceParser.Validate(context); err != nil { + return ParserOperator{}, err + } + parserOperator.TraceParser = c.TraceParser + } + return parserOperator, nil } @@ -83,6 +91,7 @@ type ParserOperator struct { PreserveTo *entry.Field TimeParser *TimeParser SeverityParser *SeverityParser + TraceParser *TraceParser } // ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators. @@ -154,6 +163,11 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars severityParseErr = p.SeverityParser.Parse(entry) } + var traceParseErr error + if p.TraceParser != nil { + traceParseErr = p.TraceParser.Parse(entry) + } + // Handle time or severity parsing errors after attempting to parse both if timeParseErr != nil { return p.HandleEntryError(ctx, entry, errors.Wrap(timeParseErr, "time parser")) @@ -161,6 +175,9 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars if severityParseErr != nil { return p.HandleEntryError(ctx, entry, errors.Wrap(severityParseErr, "severity parser")) } + if traceParseErr != nil { + return p.HandleEntryError(ctx, entry, errors.Wrap(traceParseErr, "trace parser")) + } return nil } diff --git a/operator/helper/trace.go b/operator/helper/trace.go new file mode 100644 index 00000000..7d2966dd --- /dev/null +++ b/operator/helper/trace.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "encoding/hex" + "fmt" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/errors" + "github.com/open-telemetry/opentelemetry-log-collection/operator" +) + +// NewTraceParser creates a new trace parser with default values +func NewTraceParser() TraceParser { + traceId := entry.NewBodyField("trace_id") + spanId := entry.NewBodyField("span_id") + traceFlags := entry.NewBodyField("trace_flags") + return TraceParser{ + TraceId: &TraceIdConfig{ + ParseFrom: &traceId, + }, + SpanId: &SpanIdConfig{ + ParseFrom: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + ParseFrom: &traceFlags, + }, + } +} + +// TraceParser is a helper that parses trace spans (and flags) onto an entry. +type TraceParser struct { + TraceId *TraceIdConfig `mapstructure:"trace_id,omitempty" json:"trace_id,omitempty" yaml:"trace_id,omitempty"` + SpanId *SpanIdConfig `mapstructure:"span_id,omitempty" json:"span_id,omitempty" yaml:"span_id,omitempty"` + TraceFlags *TraceFlagsConfig `mapstructure:"trace_flags,omitempty" json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"` +} + +type TraceIdConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +type SpanIdConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +type TraceFlagsConfig struct { + ParseFrom *entry.Field `mapstructure:"parse_from,omitempty" json:"parse_from,omitempty" yaml:"parse_from,omitempty"` + PreserveTo *entry.Field `mapstructure:"preserve_to,omitempty" json:"preserve_to,omitempty" yaml:"preserve_to,omitempty"` +} + +// Validate validates a TraceParser, and reconfigures it if necessary +func (t *TraceParser) Validate(context operator.BuildContext) error { + if t.TraceId == nil { + t.TraceId = &TraceIdConfig{} + } + if t.TraceId.ParseFrom == nil { + field := entry.NewBodyField("trace_id") + t.TraceId.ParseFrom = &field + } + if t.SpanId == nil { + t.SpanId = &SpanIdConfig{} + } + if t.SpanId.ParseFrom == nil { + field := entry.NewBodyField("span_id") + t.SpanId.ParseFrom = &field + } + if t.TraceFlags == nil { + t.TraceFlags = &TraceFlagsConfig{} + } + if t.TraceFlags.ParseFrom == nil { + field := entry.NewBodyField("trace_flags") + t.TraceFlags.ParseFrom = &field + } + return nil +} + +// Best effort hex parsing for trace, spans and flags +func parseHexField(entry *entry.Entry, field *entry.Field, to *entry.Field) ([]byte, error) { + value, ok := entry.Delete(field) + if !ok { + return nil, nil + } + + data, err := hex.DecodeString(fmt.Sprintf("%v", value)) + if err != nil { + return nil, err + } + + if to != nil { + err = entry.Set(to, value) + return data, err + } + return data, nil +} + +// Parse will parse a trace (trace_id, span_id and flags) from a field and attach it to the entry +func (t *TraceParser) Parse(entry *entry.Entry) error { + var errTraceId, errSpanId, errTraceFlags error + entry.TraceId, errTraceId = parseHexField(entry, t.TraceId.ParseFrom, t.TraceId.PreserveTo) + entry.SpanId, errSpanId = parseHexField(entry, t.SpanId.ParseFrom, t.SpanId.PreserveTo) + entry.TraceFlags, errTraceFlags = parseHexField(entry, t.TraceFlags.ParseFrom, t.TraceFlags.PreserveTo) + if errTraceId != nil || errTraceFlags != nil || errSpanId != nil { + err := errors.NewError("Error decoding traces for logs", "") + if errTraceId != nil { + _ = err.WithDetails("trace_id", errTraceId.Error()) + } + if errSpanId != nil { + _ = err.WithDetails("span_id", errSpanId.Error()) + } + if errTraceFlags != nil { + _ = err.WithDetails("trace_flags", errTraceFlags.Error()) + } + return err + } + return nil +} diff --git a/operator/helper/trace_test.go b/operator/helper/trace_test.go new file mode 100644 index 00000000..d3ce373f --- /dev/null +++ b/operator/helper/trace_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package helper + +import ( + "encoding/hex" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-log-collection/entry" + "github.com/open-telemetry/opentelemetry-log-collection/testutil" +) + +func TestValidateDoesntChangeFields(t *testing.T) { + traceId := entry.NewBodyField("traceId") + spanId := entry.NewBodyField("spanId") + traceFlags := entry.NewBodyField("traceFlags") + parser := TraceParser{ + TraceId: &TraceIdConfig{ + ParseFrom: &traceId, + }, + SpanId: &SpanIdConfig{ + ParseFrom: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + ParseFrom: &traceFlags, + }, + } + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + require.Equal(t, &traceId, parser.TraceId.ParseFrom) + require.Equal(t, &spanId, parser.SpanId.ParseFrom) + require.Equal(t, &traceFlags, parser.TraceFlags.ParseFrom) +} + +func TestValidateSetsDefaultFields(t *testing.T) { + traceId := entry.NewBodyField("trace_id") + spanId := entry.NewBodyField("span_id") + traceFlags := entry.NewBodyField("trace_flags") + parser := TraceParser{} + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + require.Equal(t, &traceId, parser.TraceId.ParseFrom) + require.Equal(t, &spanId, parser.SpanId.ParseFrom) + require.Equal(t, &traceFlags, parser.TraceFlags.ParseFrom) +} + +func TestPreserveFields(t *testing.T) { + traceId := entry.NewBodyField("traceId") + spanId := entry.NewBodyField("spanId") + traceFlags := entry.NewBodyField("traceFlags") + parser := TraceParser{ + TraceId: &TraceIdConfig{ + PreserveTo: &traceId, + }, + SpanId: &SpanIdConfig{ + PreserveTo: &spanId, + }, + TraceFlags: &TraceFlagsConfig{ + PreserveTo: &traceFlags, + }, + } + err := parser.Validate(testutil.NewBuildContext(t)) + require.NoError(t, err) + + entry := entry.New() + entry.Body = map[string]interface{}{ + "trace_id": "480140f3d770a5ae32f0a22b6a812cff", + "span_id": "92c3792d54ba94f3", + "trace_flags": "01", + } + err = parser.Parse(entry) + require.NoError(t, err) + require.Equal(t, map[string]interface{}{ + "traceId": "480140f3d770a5ae32f0a22b6a812cff", + "spanId": "92c3792d54ba94f3", + "traceFlags": "01", + }, entry.Body) + + value, _ := hex.DecodeString("480140f3d770a5ae32f0a22b6a812cff") + require.Equal(t, value, entry.TraceId) + value, _ = hex.DecodeString("92c3792d54ba94f3") + require.Equal(t, value, entry.SpanId) + value, _ = hex.DecodeString("01") + require.Equal(t, value, entry.TraceFlags) +}