Skip to content

Commit

Permalink
Support flattened data_stream.* fields
Browse files Browse the repository at this point in the history
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.
  • Loading branch information
belimawr committed Sep 20, 2023
1 parent 735a8fb commit d0c797a
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
71 changes: 71 additions & 0 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "namespace"
expectedType := "type"
expectedDataset := "dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}

// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
}
42 changes: 42 additions & 0 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -100,9 +101,50 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := dedotDataStream(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

func dedotDataStream(unitConfig *proto.UnitExpectedConfig) error {
tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := config.NewConfigFrom(unitConfig.GetSource().AsMap())
if err != nil {
return fmt.Errorf("cannot create new config from Source: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return fmt.Errorf("cannot unpack config: %w", err)
}

// merge returns b if a is an empty string
merge := func(a, b string) string {
if a == "" {
return b
}
return a
}

if unitConfig.DataStream == nil {
unitConfig.DataStream = &proto.DataStream{}
}

unitConfig.DataStream.Dataset = merge(unitConfig.DataStream.Dataset, tmp.DataStream.Dataset)
unitConfig.DataStream.Namespace = merge(unitConfig.DataStream.Namespace, tmp.DataStream.Namespace)
unitConfig.DataStream.Type = merge(unitConfig.DataStream.Type, tmp.DataStream.Type)

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down

0 comments on commit d0c797a

Please sign in to comment.