Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Add trace attributes and parser (trace_id, span_id and trace_flags) (#76
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alexvanboxel authored Apr 6, 2021
1 parent e63fbfe commit 37eb654
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 0 deletions.
6 changes: 6 additions & 0 deletions entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Entry struct {
Attributes map[string]string `json:"attributes,omitempty" yaml:"attributes,omitempty"`
Resource map[string]string `json:"resource,omitempty" yaml:"resource,omitempty"`
Body interface{} `json:"body" yaml:"body"`
TraceId []byte `json:"trace_id,omitempty" yaml:"trace_id,omitempty"`
SpanId []byte `json:"span_id,omitempty" yaml:"span_id,omitempty"`
TraceFlags []byte `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"`
}

// New will create a new log entry with current timestamp and an empty body.
Expand Down Expand Up @@ -196,5 +199,8 @@ func (entry *Entry) Copy() *Entry {
Attributes: copyStringMap(entry.Attributes),
Resource: copyStringMap(entry.Resource),
Body: copyValue(entry.Body),
TraceId: copyByteArray(entry.TraceId),
SpanId: copyByteArray(entry.SpanId),
TraceFlags: copyByteArray(entry.TraceFlags),
}
}
35 changes: 35 additions & 0 deletions entry/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func TestCopy(t *testing.T) {
entry.Body = "test"
entry.Attributes = map[string]string{"label": "value"}
entry.Resource = map[string]string{"resource": "value"}
entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
entry.SpanId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
entry.TraceFlags = []byte{0x01}
copy := entry.Copy()

entry.Severity = Severity(1)
Expand All @@ -149,13 +152,45 @@ func TestCopy(t *testing.T) {
entry.Body = "new"
entry.Attributes = map[string]string{"label": "new value"}
entry.Resource = map[string]string{"resource": "new value"}
entry.TraceId[0] = 0xff
entry.SpanId[0] = 0xff
entry.TraceFlags[0] = 0xff

require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "ok", copy.SeverityText)
require.Equal(t, map[string]string{"label": "value"}, copy.Attributes)
require.Equal(t, map[string]string{"resource": "value"}, copy.Resource)
require.Equal(t, "test", copy.Body)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, copy.TraceId)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, copy.SpanId)
require.Equal(t, []byte{0x01}, copy.TraceFlags)
}

func TestCopyNil(t *testing.T) {
entry := New()
entry.Timestamp = time.Time{}
copy := entry.Copy()

entry.Severity = Severity(1)
entry.SeverityText = "1"
entry.Timestamp = time.Now()
entry.Body = "new"
entry.Attributes = map[string]string{"label": "new value"}
entry.Resource = map[string]string{"resource": "new value"}
entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03}
entry.TraceFlags = []byte{0x01}

require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "", copy.SeverityText)
require.Equal(t, map[string]string{}, copy.Attributes)
require.Equal(t, map[string]string{}, copy.Resource)
require.Equal(t, nil, copy.Body)
require.Equal(t, []byte{}, copy.TraceId)
require.Equal(t, []byte{}, copy.SpanId)
require.Equal(t, []byte{}, copy.TraceFlags)
}

func TestFieldFromString(t *testing.T) {
Expand Down
71 changes: 71 additions & 0 deletions operator/builtin/parser/trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"context"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("trace_parser", func() operator.Builder { return NewTraceParserConfig("") })
}

// NewTraceParserConfig creates a new trace parser config with default values
func NewTraceParserConfig(operatorID string) *TraceParserConfig {
return &TraceParserConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "trace_parser"),
TraceParser: helper.NewTraceParser(),
}
}

// TraceParserConfig is the configuration of a trace parser operator.
type TraceParserConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
helper.TraceParser `mapstructure:",omitempty,squash" yaml:",omitempty,inline"`
}

// Build will build a trace parser operator.
func (c TraceParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

if err := c.TraceParser.Validate(context); err != nil {
return nil, err
}

traceOperator := &TraceParserOperator{
TransformerOperator: transformerOperator,
TraceParser: c.TraceParser,
}

return []operator.Operator{traceOperator}, nil
}

// TraceParserConfig is an operator that parses traces from fields to an entry.
type TraceParserOperator struct {
helper.TransformerOperator
helper.TraceParser
}

// Process will parse traces from an entry.
func (p *TraceParserOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Parse)
}
158 changes: 158 additions & 0 deletions operator/builtin/parser/trace/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"encoding/hex"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

func TestDefaultParser(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, err := traceParserConfig.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
}

func TestTraceParserParse(t *testing.T) {
cases := []struct {
name string
inputRecord map[string]interface{}
expectedRecord map[string]interface{}
expectErr bool
traceId string
spanId string
traceFlags string
}{
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
{
"WrongFields",
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"traceFlags": "01",
"spanId": "92c3792d54ba94f3",
},
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"spanId": "92c3792d54ba94f3",
"traceFlags": "01",
},
false,
"",
"",
"",
},
{
"OnlyTraceId",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"",
"",
},
{
"WrongTraceIdFormat",
map[string]interface{}{
"trace_id": "foo_bar",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
true,
"",
"92c3792d54ba94f3",
"01",
},
{
"WrongTraceFlagFormat",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "foo_bar",
},
map[string]interface{}{},
true,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"",
},
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, _ = traceParserConfig.Build(testutil.NewBuildContext(t))
e := entry.New()
e.Body = tc.inputRecord
err := traceParserConfig.Parse(e)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedRecord, e.Body)
traceId, _ := hex.DecodeString(tc.traceId)
if len(tc.traceId) == 0 {
require.Nil(t, e.TraceId)
} else {
require.Equal(t, traceId, e.TraceId)
}
spanId, _ := hex.DecodeString(tc.spanId)
if len(tc.spanId) == 0 {
require.Nil(t, e.SpanId)
} else {
require.Equal(t, spanId, e.SpanId)
}
traceFlags, _ := hex.DecodeString(tc.traceFlags)
if len(tc.traceFlags) == 0 {
require.Nil(t, e.TraceFlags)
} else {
require.Equal(t, traceFlags, e.TraceFlags)
}
})
}
}
3 changes: 3 additions & 0 deletions operator/builtin/transformer/noop/noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func TestProcess(t *testing.T) {
entry := entry.New()
entry.AddAttribute("label", "value")
entry.AddResourceKey("resource", "value")
entry.TraceId = []byte{0x01}
entry.SpanId = []byte{0x01}
entry.TraceFlags = []byte{0x01}

expected := entry.Copy()
err = op.Process(context.Background(), entry)
Expand Down
17 changes: 17 additions & 0 deletions operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ParserConfig struct {
PreserveTo *entry.Field `mapstructure:"preserve_to" json:"preserve_to" yaml:"preserve_to"`
TimeParser *TimeParser `mapstructure:"timestamp,omitempty" json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
SeverityParserConfig *SeverityParserConfig `mapstructure:"severity,omitempty" json:"severity,omitempty" yaml:"severity,omitempty"`
TraceParser *TraceParser `mapstructure:"trace,omitempty" json:"trace,omitempty" yaml:"trace,omitempty"`
}

// Build will build a parser operator.
Expand Down Expand Up @@ -72,6 +73,13 @@ func (c ParserConfig) Build(context operator.BuildContext) (ParserOperator, erro
parserOperator.SeverityParser = &severityParser
}

if c.TraceParser != nil {
if err := c.TraceParser.Validate(context); err != nil {
return ParserOperator{}, err
}
parserOperator.TraceParser = c.TraceParser
}

return parserOperator, nil
}

Expand All @@ -83,6 +91,7 @@ type ParserOperator struct {
PreserveTo *entry.Field
TimeParser *TimeParser
SeverityParser *SeverityParser
TraceParser *TraceParser
}

// ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators.
Expand Down Expand Up @@ -154,13 +163,21 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars
severityParseErr = p.SeverityParser.Parse(entry)
}

var traceParseErr error
if p.TraceParser != nil {
traceParseErr = p.TraceParser.Parse(entry)
}

// Handle time or severity parsing errors after attempting to parse both
if timeParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(timeParseErr, "time parser"))
}
if severityParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(severityParseErr, "severity parser"))
}
if traceParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(traceParseErr, "trace parser"))
}
return nil
}

Expand Down
Loading

0 comments on commit 37eb654

Please sign in to comment.