Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
Establish Autogenerated IDs for operators with unspecified IDs (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrod1598 authored Sep 8, 2021
1 parent d0c9d8d commit 049d0cb
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 267 deletions.
1 change: 1 addition & 0 deletions operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Builder interface {
ID() string
Type() string
Build(BuildContext) ([]Operator, error)
SetID(string)
BuildsMultipleOps() bool
}

Expand Down
1 change: 1 addition & 0 deletions operator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type FakeBuilder struct {
func (f *FakeBuilder) Build(context BuildContext) ([]Operator, error) { return nil, nil }
func (f *FakeBuilder) ID() string { return "plugin" }
func (f *FakeBuilder) Type() string { return "plugin" }
func (f *FakeBuilder) SetID(s string) {}
func (f *FakeBuilder) BuildsMultipleOps() bool { return false }

func TestUnmarshalJSONErrors(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions operator/helper/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (c BasicConfig) ID() string {
return c.OperatorID
}

// SetID will Update the operator id.
func (c *BasicConfig) SetID(id string) {
c.OperatorID = id
}

// Type will return the operator type.
func (c BasicConfig) Type() string {
return c.OperatorType
Expand Down
29 changes: 29 additions & 0 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package pipeline

import (
"fmt"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
)

Expand All @@ -23,6 +25,7 @@ type Config []operator.Config

// BuildOperators builds the operators from the list of configs into operators.
func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operator.Operator) ([]operator.Operator, error) {
c.dedeplucateIDs()
// buildsMulti's key represents an operator's ID that builds multiple operators, e.g. Plugins.
// the value is the plugin's first operator's ID.
buildsMulti := make(map[string]string)
Expand Down Expand Up @@ -50,6 +53,32 @@ func (c Config) BuildOperators(bc operator.BuildContext, defaultOperator operato
return operators, nil
}

func (c Config) dedeplucateIDs() {
typeMap := make(map[string]int)
for _, op := range c {
if op.Type() != op.ID() {
continue
}

if typeMap[op.Type()] == 0 {
typeMap[op.Type()]++
continue
}
newID := fmt.Sprintf("%s%d", op.Type(), typeMap[op.Type()])

for j := 0; j < len(c); j++ {
if newID == c[j].ID() {
j = 0
typeMap[op.Type()]++
newID = fmt.Sprintf("%s%d", op.Type(), typeMap[op.Type()])
}
}

typeMap[op.Type()]++
op.SetID(newID)
}
}

// BuildPipeline will build a pipeline from the config.
func (c Config) BuildPipeline(bc operator.BuildContext, defaultOperator operator.Operator) (*DirectedPipeline, error) {
operators, err := c.BuildOperators(bc, defaultOperator)
Expand Down
278 changes: 278 additions & 0 deletions pipeline/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/json"
"github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/copy"
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

func newDummyJSON(dummyID string) operator.Config {
return operator.Config{Builder: json.NewJSONParserConfig(dummyID)}
}

func newDummyCopy(dummyID string) operator.Config {
return operator.Config{Builder: copy.NewCopyOperatorConfig(dummyID)}
}

type deduplicateTestCase struct {
name string
ops func() Config
expectedOps Config
}

func TestDeduplicateIDs(t *testing.T) {
cases := []deduplicateTestCase{
{
"one_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
return ops
}(),
},
{
"multi_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))

return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
{
"different_ops",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy"))

return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy1"))
return ops
}(),
},
{
"unordered",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyCopy("copy1"))
ops = append(ops, newDummyJSON("json_parser2"))
return ops
}(),
},
{
"already_renamed",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
{
"iterate_twice",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser1"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser2"))
ops = append(ops, newDummyJSON("json_parser4"))
return ops
}(),
},
}

for _, tc := range cases {
t.Run("Deduplicate/"+tc.name, func(t *testing.T) {
ops := tc.ops()
ops.dedeplucateIDs()
require.Equal(t, ops, tc.expectedOps)
})
}
}

type outputTestCase struct {
name string
ops func() Config
expectedOutputs []string
}

func TestUpdateOutputIDs(t *testing.T) {
cases := []outputTestCase{
{
"one_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
},
},
{
"multi_op_rename",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.json_parser3",
},
},
{
"different_ops",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyCopy("copy"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.copy",
"$.copy1",
},
},
{
"unordered",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyCopy("copy"))
return ops
},
[]string{
"$.copy",
"$.json_parser1",
"$.copy1",
},
},
{
"already_renamed",
func() Config {
var ops Config
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser"))
ops = append(ops, newDummyJSON("json_parser3"))
ops = append(ops, newDummyJSON("json_parser"))
return ops
},
[]string{
"$.json_parser1",
"$.json_parser2",
"$.json_parser3",
"$.json_parser4",
},
},
}

for _, tc := range cases {
t.Run("UpdateOutputIDs/"+tc.name, func(t *testing.T) {
bc := testutil.NewBuildContext(t)
ops, err := tc.ops().BuildOperators(bc, nil)
require.NoError(t, err)
require.Equal(t, len(tc.expectedOutputs), len(ops)-1)
for i := 0; i < len(ops)-1; i++ {
require.Equal(t, tc.expectedOutputs[i], ops[i].GetOutputIDs()[0])
}
})
}
}
Loading

0 comments on commit 049d0cb

Please sign in to comment.