Skip to content

Commit

Permalink
Support flattened data_stream.* fields
Browse files Browse the repository at this point in the history
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.

Some small additions and refactoring are also implemented in the
integration test framework as well as some more detailed
documentation.
  • Loading branch information
belimawr committed Oct 9, 2023
1 parent 33c6934 commit da565c3
Show file tree
Hide file tree
Showing 8 changed files with 621 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Support flattened data_stream.* fields

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: >-
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3465

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3191
71 changes: 71 additions & 0 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "test-namespace"
expectedType := "test-type"
expectedDataset := "test-dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}

// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
}
83 changes: 83 additions & 0 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -100,9 +101,91 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := updateDataStreamsFromSource(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

// dataStreamAndSource is a generic way to represent proto mesages
// that contain a source field and a datastream field.
type dataStreamAndSource interface {
GetDataStream() *proto.DataStream
GetSource() *structpb.Struct
}

func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
ds := raw.GetDataStream()
if ds == nil {
ds = &proto.DataStream{}
}

tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := config.NewConfigFrom(raw.GetSource().AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") {
return nil, errors.New("duplicated key 'datastream.type'")
}

if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: merge(tmp.DataStream.Dataset, ds.Dataset),
Type: merge(tmp.DataStream.Type, ds.Type),
Namespace: merge(tmp.DataStream.Namespace, ds.Namespace),
Source: raw.GetDataStream().GetSource(),
}

return ret, nil
}

// merge returns b if a is an empty string
func merge(a, b string) string {
if a == "" {
return b
}
return a
}

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig)
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream)
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
}
}

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down
9 changes: 8 additions & 1 deletion pkg/component/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) {
assert.Equal(t, err.Error(), scenario.Err.Error())
} else {
require.NoError(t, err)
assert.EqualValues(t, scenario.Expected, observed)
// protocmp.Transform ensures we do not compare any internal
// protobuf fields
if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) {
t.Errorf("mismatch (-want +got) \n%s",
cmp.Diff(scenario.Expected, observed, protocmp.Transform()))
}
}
})
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error {
// with `WithAllowErrors()` then `Run` will exit early and return the logged error.
//
// If no `states` are provided then the Elastic Agent runs until the context is cancelled.
//
// The Elastic-Agent is started agent in test mode (--testing-mode) this mode
// expects the initial configuration (full YAML config) via gRPC.
// This configuration should be passed in the State.Configure field.
//
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
if f.installed {
return errors.New("fixture is installed; cannot be run")
Expand Down
65 changes: 60 additions & 5 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
return handleDocsResponse(res)
}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatastreamWithContext(context.Background(), client, index)
// GetLogsForDataset returns any logs associated with the datastream
func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatasetWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
Expand Down Expand Up @@ -270,8 +270,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents,
return handleDocsResponse(res)
}

// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
// GetLogsForDatasetWithContext returns any logs associated with the datastream
func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
Expand Down Expand Up @@ -302,7 +302,62 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor
return handleDocsResponse(res)
}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(
ctx context.Context,
client elastictransport.Interface,
dsType, dataset, namespace string) (Documents, error) {

query := map[string]any{
"_source": []string{"message"},
"query": map[string]any{
"bool": map[string]any{
"must": []any{
map[string]any{
"match": map[string]any{
"data_stream.dataset": dataset,
},
},
map[string]any{
"match": map[string]any{
"data_stream.namespace": namespace,
},
},
map[string]any{
"match": map[string]any{
"data_stream.type": dsType,
},
},
},
},
},
}

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// handleDocsResponse converts the esapi.Response into Documents,
// it closes the response.Body after reading
func handleDocsResponse(res *esapi.Response) (Documents, error) {
defer res.Body.Close()
if res.StatusCode >= 300 || res.StatusCode < 200 {
return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String())
}
Expand Down
Loading

0 comments on commit da565c3

Please sign in to comment.