Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Establish Autogenerated IDs for operators with unspecified IDs #246

Merged
merged 33 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2b013ec
Add auto generated IDs to operators
Mrod1598 May 3, 2021
a993f0b
Move generation spot
Mrod1598 May 7, 2021
d015377
Add Testing
Mrod1598 May 11, 2021
7e4cace
Update where IDs are changed
Mrod1598 May 12, 2021
7358fff
Add testing for OutputIDs
Mrod1598 May 12, 2021
93a93dd
Clean up diff
Mrod1598 May 12, 2021
3c268c5
Add Error handling
Mrod1598 May 12, 2021
7db86f8
Fix CI
Mrod1598 May 12, 2021
3034fc0
Add testing for IDs
Mrod1598 May 14, 2021
dcc2aa6
Test Ops is pipeline
Mrod1598 May 14, 2021
a09b457
Add testing template
Mrod1598 May 14, 2021
fe03272
Change how default outputs are set
Mrod1598 May 17, 2021
f01804d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
Mrod1598 May 18, 2021
733ae20
Add testing for outputs
Mrod1598 May 18, 2021
1800ef9
Remove old default outputs func
Mrod1598 May 18, 2021
cc92043
Add ID testing
Mrod1598 May 19, 2021
260b951
Change how default outputs are set
Mrod1598 May 19, 2021
43c5b96
Add testing for output handling
Mrod1598 May 20, 2021
ac2b38c
Merge in main
Mrod1598 May 20, 2021
9782118
Update Tests
Mrod1598 May 20, 2021
d193899
Update mock comment
Mrod1598 May 20, 2021
dc78c19
Implement PR feedback
Mrod1598 May 21, 2021
705cc0c
Update comment
Mrod1598 May 21, 2021
37f8e0d
Implement PR feedback
Mrod1598 May 24, 2021
68a9e70
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
Mrod1598 May 24, 2021
3e97801
Merge In Plugin-input-fix
Mrod1598 May 24, 2021
b1c61b4
Merge in main
Mrod1598 May 24, 2021
1fab496
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
Mrod1598 Aug 24, 2021
fced5e3
WIP
Mrod1598 Aug 25, 2021
cfc67f7
Update test
Mrod1598 Aug 25, 2021
709d89b
Remove error return value
Mrod1598 Aug 25, 2021
68f6b1f
PR Feedback
Sep 7, 2021
99125fb
Rename test helper file
Sep 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Builder interface {
ID() string
Type() string
Build(BuildContext) ([]Operator, error)
SetID(string)
BuildsMultipleOps() bool
}

Expand Down
1 change: 1 addition & 0 deletions operator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type FakeBuilder struct {
func (f *FakeBuilder) Build(context BuildContext) ([]Operator, error) { return nil, nil }
func (f *FakeBuilder) ID() string { return "plugin" }
func (f *FakeBuilder) Type() string { return "plugin" }
func (f *FakeBuilder) SetID(s string) {}
func (f *FakeBuilder) BuildsMultipleOps() bool { return false }

func TestUnmarshalJSONErrors(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions operator/helper/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c BasicConfig) ID() string {
return c.OperatorID
}

// SetID will Update the operator id.
func (c *BasicConfig) SetID(id string) {
c.OperatorID = id
}

// Type will return the operator type.
func (c BasicConfig) Type() string {
return c.OperatorType
Expand Down
29 changes: 29 additions & 0 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package pipeline

import (
"fmt"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
)

Expand All @@ -23,6 +25,7 @@ type Config []operator.Config

// BuildOperators builds the operators from the list of configs into operators.
func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operator.Operator) ([]operator.Operator, error) {
c.dedeplucateIDs()
// buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins.
// the value is the plugin's first operator's ID.
buildsMulti := make(map[string]string)
Expand Down Expand Up @@ -50,6 +53,32 @@ func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operato
return operators, nil
}

func (c Config) dedeplucateIDs() {
typeMap := make(map[string]int)
for _, op := range c {
if op.Type() != op.ID() {
continue
}

if typeMap[op.Type()] == 0 {
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
typeMap[op.Type()]++
continue
}
newID := fmt.Sprintf("%s%d", op.Type(), typeMap[op.Type()])

for j := 0; j < len(c); j++ {
if newID == c[j].ID() {
j = 0
typeMap[op.Type()]++
newID = fmt.Sprintf("%s%d", op.Type(), typeMap[op.Type()])
}
}

typeMap[op.Type()]++
op.SetID(newID)
}
}

// BuildPipeline will build a pipeline from the config.
func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator.Operator) (*DirectedPipeline, error) {
operators, err := c.BuildOperators(bc, defaultOperator)
Expand Down
278 changes: 278 additions & 0 deletions pipeline/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/json"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/copy"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

func newDummyJSON(dummyID string) operator.Config {
return operator.Config{Builder: json.NewJSONParserConfig(dummyID)}
}

func newDummyCopy(dummyID string) operator.Config {
return operator.Config{Builder: copy.NewCopyOperatorConfig(dummyID)}
}

type deduplicateTestCase struct {
name string
ops func() Config
expectedOps Config
}

func TestDeduplicateIDs(t *testing.T) {
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
cases := []deduplicateTestCase{
{
"one_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
return ops
}(),
},
{
"multi_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))

return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
{
"different_ops",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy"))

return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy1"))
return ops
}(),
},
{
"unordered",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyCopy("copy1"))
ops = append(ops, newDummyJSON("json_parser2"))
return ops
}(),
},
{
"already_renamed",
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
{
"iterate_twice",
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
}

for _, tc := range cases {
t.Run("Deduplicate/"+tc.name, func(t *testing.T) {
ops := tc.ops()
ops.dedeplucateIDs()
require.Equal(t, ops, tc.expectedOps)
})
}
}

type outputTestCase struct {
name string
ops func() Config
expectedOutputs []string
}

func TestUpdateOutputIDs(t *testing.T) {
cases := []outputTestCase{
{
"one_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
},
},
{
"multi_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.json_parser3",
},
},
{
"different_ops",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.copy",
"$.copy1",
},
},
{
"unordered",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
return ops
},
[]string{
"$.copy",
"$.json_parser1",
"$.copy1",
},
},
{
"already_renamed",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.json_parser3",
"$.json_parser4",
},
},
}

for _, tc := range cases {
t.Run("UpdateOutputIDs/"+tc.name, func(t *testing.T) {
bc := testutil.NewBuildContext(t)
ops, err := tc.ops().BuildOperators(bc, nil)
require.NoError(t, err)
require.Equal(t, len(tc.expectedOutputs), len(ops)-1)
for i := 0; i < len(ops)-1; i++ {
require.Equal(t, tc.expectedOutputs[i], ops[i].GetOutputIDs()[0])
}
})
}
}
Loading