Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Add trace attributes and parser (trace_id, span_id and trace_flags) #76

Merged
merged 1 commit into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Entry struct {
Attributes map[string]string `json:"attributes,omitempty" yaml:"attributes,omitempty"`
Resource map[string]string `json:"resource,omitempty" yaml:"resource,omitempty"`
Body interface{} `json:"body" yaml:"body"`
TraceId []byte `json:"trace_id,omitempty" yaml:"trace_id,omitempty"`
SpanId []byte `json:"span_id,omitempty" yaml:"span_id,omitempty"`
TraceFlags []byte `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"`
}

// New will create a new log entry with current timestamp and an empty body.
Expand Down Expand Up @@ -196,5 +199,8 @@ func (entry *Entry) Copy() *Entry {
Attributes: copyStringMap(entry.Attributes),
Resource: copyStringMap(entry.Resource),
Body: copyValue(entry.Body),
TraceId: copyByteArray(entry.TraceId),
SpanId: copyByteArray(entry.SpanId),
TraceFlags: copyByteArray(entry.TraceFlags),
}
}
35 changes: 35 additions & 0 deletions entry/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func TestCopy(t *testing.T) {
entry.Body = "test"
entry.Attributes = map[string]string{"label": "value"}
entry.Resource = map[string]string{"resource": "value"}
entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
entry.SpanId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
entry.TraceFlags = []byte{0x01}
copy := entry.Copy()

entry.Severity = Severity(1)
Expand All @@ -149,13 +152,45 @@ func TestCopy(t *testing.T) {
entry.Body = "new"
entry.Attributes = map[string]string{"label": "new value"}
entry.Resource = map[string]string{"resource": "new value"}
entry.TraceId[0] = 0xff
entry.SpanId[0] = 0xff
entry.TraceFlags[0] = 0xff

require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "ok", copy.SeverityText)
require.Equal(t, map[string]string{"label": "value"}, copy.Attributes)
require.Equal(t, map[string]string{"resource": "value"}, copy.Resource)
require.Equal(t, "test", copy.Body)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, copy.TraceId)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, copy.SpanId)
require.Equal(t, []byte{0x01}, copy.TraceFlags)
}

func TestCopyNil(t *testing.T) {
entry := New()
entry.Timestamp = time.Time{}
copy := entry.Copy()

entry.Severity = Severity(1)
entry.SeverityText = "1"
entry.Timestamp = time.Now()
entry.Body = "new"
entry.Attributes = map[string]string{"label": "new value"}
entry.Resource = map[string]string{"resource": "new value"}
entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03}
entry.TraceFlags = []byte{0x01}

require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "", copy.SeverityText)
require.Equal(t, map[string]string{}, copy.Attributes)
require.Equal(t, map[string]string{}, copy.Resource)
require.Equal(t, nil, copy.Body)
require.Equal(t, []byte{}, copy.TraceId)
require.Equal(t, []byte{}, copy.SpanId)
require.Equal(t, []byte{}, copy.TraceFlags)
}

func TestFieldFromString(t *testing.T) {
Expand Down
71 changes: 71 additions & 0 deletions operator/builtin/parser/trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"context"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("trace_parser", func() operator.Builder { return NewTraceParserConfig("") })
}

// NewTraceParserConfig creates a new trace parser config with default values
func NewTraceParserConfig(operatorID string) *TraceParserConfig {
return &TraceParserConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "trace_parser"),
TraceParser: helper.NewTraceParser(),
}
}

// TraceParserConfig is the configuration of a trace parser operator.
type TraceParserConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
helper.TraceParser `mapstructure:",omitempty,squash" yaml:",omitempty,inline"`
}

// Build will build a trace parser operator.
func (c TraceParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

if err := c.TraceParser.Validate(context); err != nil {
return nil, err
}

traceOperator := &TraceParserOperator{
TransformerOperator: transformerOperator,
TraceParser: c.TraceParser,
}

return []operator.Operator{traceOperator}, nil
}

// TraceParserConfig is an operator that parses traces from fields to an entry.
type TraceParserOperator struct {
helper.TransformerOperator
helper.TraceParser
}

// Process will parse traces from an entry.
func (p *TraceParserOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Parse)
}
158 changes: 158 additions & 0 deletions operator/builtin/parser/trace/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"encoding/hex"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

func TestDefaultParser(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, err := traceParserConfig.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
}

func TestTraceParserParse(t *testing.T) {
cases := []struct {
name string
inputRecord map[string]interface{}
expectedRecord map[string]interface{}
expectErr bool
traceId string
spanId string
traceFlags string
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}{
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
{
"WrongFields",
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"traceFlags": "01",
"spanId": "92c3792d54ba94f3",
},
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"spanId": "92c3792d54ba94f3",
"traceFlags": "01",
},
false,
"",
"",
"",
},
{
"OnlyTraceId",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"",
"",
},
{
"WrongTraceIdFormat",
map[string]interface{}{
"trace_id": "foo_bar",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
true,
"",
"92c3792d54ba94f3",
"01",
},
{
"WrongTraceFlagFormat",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "foo_bar",
},
map[string]interface{}{},
true,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"",
},
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, _ = traceParserConfig.Build(testutil.NewBuildContext(t))
e := entry.New()
e.Body = tc.inputRecord
err := traceParserConfig.Parse(e)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedRecord, e.Body)
traceId, _ := hex.DecodeString(tc.traceId)
if len(tc.traceId) == 0 {
require.Nil(t, e.TraceId)
} else {
require.Equal(t, traceId, e.TraceId)
}
spanId, _ := hex.DecodeString(tc.spanId)
if len(tc.spanId) == 0 {
require.Nil(t, e.SpanId)
} else {
require.Equal(t, spanId, e.SpanId)
}
traceFlags, _ := hex.DecodeString(tc.traceFlags)
if len(tc.traceFlags) == 0 {
require.Nil(t, e.TraceFlags)
} else {
require.Equal(t, traceFlags, e.TraceFlags)
}
})
}
}
3 changes: 3 additions & 0 deletions operator/builtin/transformer/noop/noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func TestProcess(t *testing.T) {
entry := entry.New()
entry.AddAttribute("label", "value")
entry.AddResourceKey("resource", "value")
entry.TraceId = []byte{0x01}
entry.SpanId = []byte{0x01}
entry.TraceFlags = []byte{0x01}

expected := entry.Copy()
err = op.Process(context.Background(), entry)
Expand Down
17 changes: 17 additions & 0 deletions operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ParserConfig struct {
PreserveTo *entry.Field `mapstructure:"preserve_to" json:"preserve_to" yaml:"preserve_to"`
TimeParser *TimeParser `mapstructure:"timestamp,omitempty" json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
SeverityParserConfig *SeverityParserConfig `mapstructure:"severity,omitempty" json:"severity,omitempty" yaml:"severity,omitempty"`
TraceParser *TraceParser `mapstructure:"trace,omitempty" json:"trace,omitempty" yaml:"trace,omitempty"`
}

// Build will build a parser operator.
Expand Down Expand Up @@ -72,6 +73,13 @@ func (c ParserConfig) Build(context operator.BuildContext) (ParserOperator, erro
parserOperator.SeverityParser = &severityParser
}

if c.TraceParser != nil {
if err := c.TraceParser.Validate(context); err != nil {
return ParserOperator{}, err
}
parserOperator.TraceParser = c.TraceParser
}

return parserOperator, nil
}

Expand All @@ -83,6 +91,7 @@ type ParserOperator struct {
PreserveTo *entry.Field
TimeParser *TimeParser
SeverityParser *SeverityParser
TraceParser *TraceParser
}

// ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators.
Expand Down Expand Up @@ -154,13 +163,21 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars
severityParseErr = p.SeverityParser.Parse(entry)
}

var traceParseErr error
if p.TraceParser != nil {
traceParseErr = p.TraceParser.Parse(entry)
}

// Handle time or severity parsing errors after attempting to parse both
if timeParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(timeParseErr, "time parser"))
}
if severityParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(severityParseErr, "severity parser"))
}
if traceParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(traceParseErr, "trace parser"))
}
return nil
}

Expand Down
Loading