Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Add trace attributes and parser (trace_id, span_id and trace_flags)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvanboxel committed Apr 4, 2021
1 parent 0356f77 commit 2401f6d
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 1 deletion.
6 changes: 6 additions & 0 deletions entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Entry struct {
Attributes map[string]string `json:"attributes,omitempty" yaml:"attributes,omitempty"`
Resource map[string]string `json:"resource,omitempty" yaml:"resource,omitempty"`
Record interface{} `json:"record" yaml:"record"`
TraceId []byte `json:"trace_id" yaml:"trace_id"`
SpanId []byte `json:"span_id" yaml:"span_id"`
TraceFlags []byte `json:"trace_flags" yaml:"trace_flags"`
}

// New will create a new log entry with current timestamp and an empty record.
Expand Down Expand Up @@ -196,5 +199,8 @@ func (entry *Entry) Copy() *Entry {
Attributes: copyStringMap(entry.Attributes),
Resource: copyStringMap(entry.Resource),
Record: copyValue(entry.Record),
TraceId: entry.TraceId,
SpanId: entry.SpanId,
TraceFlags: entry.TraceFlags,
}
}
9 changes: 9 additions & 0 deletions entry/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func TestCopy(t *testing.T) {
entry.Record = "test"
entry.Attributes = map[string]string{"label": "value"}
entry.Resource = map[string]string{"resource": "value"}
entry.TraceId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}
entry.SpanId = []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
entry.TraceFlags = []byte{0x01}
copy := entry.Copy()

entry.Severity = Severity(1)
Expand All @@ -149,13 +152,19 @@ func TestCopy(t *testing.T) {
entry.Record = "new"
entry.Attributes = map[string]string{"label": "new value"}
entry.Resource = map[string]string{"resource": "new value"}
entry.TraceId = []byte{0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
entry.SpanId = []byte{0x04, 0x05, 0x06, 0x07, 0x08, 0x00, 0x01, 0x02, 0x03}
entry.TraceFlags = []byte{0x00}

require.Equal(t, time.Time{}, copy.Timestamp)
require.Equal(t, Severity(0), copy.Severity)
require.Equal(t, "ok", copy.SeverityText)
require.Equal(t, map[string]string{"label": "value"}, copy.Attributes)
require.Equal(t, map[string]string{"resource": "value"}, copy.Resource)
require.Equal(t, "test", copy.Record)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}, copy.TraceId)
require.Equal(t, []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, copy.SpanId)
require.Equal(t, []byte{0x01}, copy.TraceFlags)
}

func TestFieldFromString(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/output/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ func TestStdoutOperator(t *testing.T) {
marshalledTimestamp, err := json.Marshal(ts)
require.NoError(t, err)

expected := `{"timestamp":` + string(marshalledTimestamp) + `,"severity":0,"record":"test record"}` + "\n"
expected := `{"timestamp":` + string(marshalledTimestamp) + `,"severity":0,"record":"test record","trace_id":null,"span_id":null,"trace_flags":null}` + "\n"
require.Equal(t, expected, buf.String())
}
71 changes: 71 additions & 0 deletions operator/builtin/parser/trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"context"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("trace_parser", func() operator.Builder { return NewTraceParserConfig("") })
}

// NewTraceParserConfig creates a new trace parser config with default values
func NewTraceParserConfig(operatorID string) *TraceParserConfig {
return &TraceParserConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "trace_parser"),
TraceParser: helper.NewTraceParser(),
}
}

// SeverityParserConfig is the configuration of a severity parser operator.
type TraceParserConfig struct {
helper.TransformerConfig `mapstructure:",squash" yaml:",inline"`
helper.TraceParser `mapstructure:",omitempty,squash" yaml:",omitempty,inline"`
}

// Build will build a time parser operator.
func (c TraceParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, err
}

if err := c.TraceParser.Validate(context); err != nil {
return nil, err
}

traceOperator := &TraceParserOperator{
TransformerOperator: transformerOperator,
TraceParser: c.TraceParser,
}

return []operator.Operator{traceOperator}, nil
}

// SeverityParserOperator is an operator that parses time from a field to an entry.
type TraceParserOperator struct {
helper.TransformerOperator
helper.TraceParser
}

// Process will parse time from an entry.
func (p *TraceParserOperator) Process(ctx context.Context, entry *entry.Entry) error {
return p.ProcessWith(ctx, entry, p.Parse)
}
154 changes: 154 additions & 0 deletions operator/builtin/parser/trace/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package trace

import (
"encoding/hex"
"testing"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
"github.com/stretchr/testify/require"
)

func TestDefaultParser(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, err := traceParserConfig.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
}

func TestTraceParserParse(t *testing.T) {
cases := []struct {
name string
inputRecord map[string]interface{}
expectedRecord map[string]interface{}
expectErr bool
traceId string
spanId string
traceFlags string
}{
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
{
"WrongFields",
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"traceFlags": "01",
"spanId": "92c3792d54ba94f3",
},
map[string]interface{}{
"traceId": "480140f3d770a5ae32f0a22b6a812cff",
"spanId": "92c3792d54ba94f3",
"traceFlags": "01",
},
false,
"",
"",
"",
},
{
"OnlyTraceId",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"",
"",
},
{
"WrongTraceIdFormat",
map[string]interface{}{
"trace_id": "foo_bar",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
true,
"",
"92c3792d54ba94f3",
"01",
},
{
"WrongTraceFlagFormat",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "foo_bar",
},
map[string]interface{}{},
true,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"",
},
{
"AllFields",
map[string]interface{}{
"trace_id": "480140f3d770a5ae32f0a22b6a812cff",
"span_id": "92c3792d54ba94f3",
"trace_flags": "01",
},
map[string]interface{}{},
false,
"480140f3d770a5ae32f0a22b6a812cff",
"92c3792d54ba94f3",
"01",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
traceParserConfig := NewTraceParserConfig("")
_, _ = traceParserConfig.Build(testutil.NewBuildContext(t))
e := entry.New()
e.Record = tc.inputRecord
err := traceParserConfig.Parse(e)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedRecord, e.Record)
traceId, _ := hex.DecodeString(tc.traceId)
if len(tc.traceId) == 0 {
traceId = nil
}
spanId, _ := hex.DecodeString(tc.spanId)
if len(tc.spanId) == 0 {
spanId = nil
}
traceFlags, _ := hex.DecodeString(tc.traceFlags)
if len(tc.traceFlags) == 0 {
traceFlags = nil
}
require.Equal(t, traceId, e.TraceId)
require.Equal(t, spanId, e.SpanId)
require.Equal(t, traceFlags, e.TraceFlags)
})
}
}
13 changes: 13 additions & 0 deletions operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ParserConfig struct {
PreserveTo *entry.Field `mapstructure:"preserve_to" json:"preserve_to" yaml:"preserve_to"`
TimeParser *TimeParser `mapstructure:"timestamp,omitempty" json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
SeverityParserConfig *SeverityParserConfig `mapstructure:"severity,omitempty" json:"severity,omitempty" yaml:"severity,omitempty"`
TraceParser *TraceParser `mapstructure:"trace,omitempty" json:"trace,omitempty" yaml:"trace,omitempty"`
}

// Build will build a parser operator.
Expand Down Expand Up @@ -72,6 +73,13 @@ func (c ParserConfig) Build(context operator.BuildContext) (ParserOperator, erro
parserOperator.SeverityParser = &severityParser
}

if c.TraceParser != nil {
if err := c.TraceParser.Validate(context); err != nil {
return ParserOperator{}, err
}
parserOperator.TraceParser = c.TraceParser
}

return parserOperator, nil
}

Expand All @@ -83,6 +91,7 @@ type ParserOperator struct {
PreserveTo *entry.Field
TimeParser *TimeParser
SeverityParser *SeverityParser
TraceParser *TraceParser
}

// ProcessWith will run ParseWith on the entry, then forward the entry on to the next operators.
Expand Down Expand Up @@ -154,6 +163,10 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars
severityParseErr = p.SeverityParser.Parse(entry)
}

if p.TraceParser != nil {
_ = p.TraceParser.Parse(entry)
}

// Handle time or severity parsing errors after attempting to parse both
if timeParseErr != nil {
return p.HandleEntryError(ctx, entry, errors.Wrap(timeParseErr, "time parser"))
Expand Down
Loading

0 comments on commit 2401f6d

Please sign in to comment.