From 374ebffdb62aa6998158063aa4c9537b126d0f32 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 10 Aug 2020 11:14:35 -0400 Subject: [PATCH 1/4] Add host metadata decorator --- internal/testutil/mocks.go | 51 ++------ .../builtin/transformer/host_decorator.go | 73 +++++++++++ .../transformer/host_decorator_test.go | 120 ++++++++++++++++++ 3 files changed, 202 insertions(+), 42 deletions(-) create mode 100644 operator/builtin/transformer/host_decorator.go create mode 100644 operator/builtin/transformer/host_decorator_test.go diff --git a/internal/testutil/mocks.go b/internal/testutil/mocks.go index 8e25dd809..5aca324e9 100644 --- a/internal/testutil/mocks.go +++ b/internal/testutil/mocks.go @@ -30,50 +30,17 @@ func NewFakeOutput(t TestingT) *FakeOutput { } } -func (f *FakeOutput) CanOutput() bool { - return false -} - -func (f *FakeOutput) CanProcess() bool { - return true -} +func (f *FakeOutput) CanOutput() bool { return false } +func (f *FakeOutput) CanProcess() bool { return true } +func (f *FakeOutput) ID() string { return "fake" } +func (f *FakeOutput) Logger() *zap.SugaredLogger { return f.SugaredLogger } +func (f *FakeOutput) Outputs() []operator.Operator { return nil } +func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error { return nil } +func (f *FakeOutput) Start() error { return nil } +func (f *FakeOutput) Stop() error { return nil } +func (f *FakeOutput) Type() string { return "fake_output" } -func (f *FakeOutput) ID() string { - return "fake" -} - -// Logger provides a mock function with given fields: -func (f *FakeOutput) Logger() *zap.SugaredLogger { - return f.SugaredLogger -} - -// Outputs provides a mock function with given fields: -func (f *FakeOutput) Outputs() []operator.Operator { - return nil -} - -// Process provides a mock function with given fields: _a0, _a1 func (f *FakeOutput) Process(ctx context.Context, entry *entry.Entry) error { f.Received <- entry return nil } - -// SetOutputs provides a mock function with given fields: _a0 -func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error { - return nil -} - -// Start provides a mock function with given fields: -func (f *FakeOutput) Start() error { - return nil -} - -// Stop provides a mock function with given fields: -func (f *FakeOutput) Stop() error { - return nil -} - -// Type provides a mock function with given fields: -func (f *FakeOutput) Type() string { - return "fake_output" -} diff --git a/operator/builtin/transformer/host_decorator.go b/operator/builtin/transformer/host_decorator.go new file mode 100644 index 000000000..7dfaae737 --- /dev/null +++ b/operator/builtin/transformer/host_decorator.go @@ -0,0 +1,73 @@ +package transformer + +import ( + "context" + "os" + + "github.com/observiq/carbon/entry" + "github.com/observiq/carbon/errors" + "github.com/observiq/carbon/operator" + "github.com/observiq/carbon/operator/helper" +) + +func init() { + operator.Register("host_decorator", func() operator.Builder { return NewHostDecoratorConfig("") }) +} + +// NewHostDecoratorConfig returns a HostDecoratorConfig with default values +func NewHostDecoratorConfig(operatorID string) *HostDecoratorConfig { + return &HostDecoratorConfig{ + TransformerConfig: helper.NewTransformerConfig(operatorID, "host_decorator"), + IncludeHostname: true, + } +} + +// +type HostDecoratorConfig struct { + helper.TransformerConfig `yaml:",inline"` + IncludeHostname bool `json:"include_hostname,omitempty" yaml:"include_hostname,omitempty"` +} + +// Build will build an operator from the supplied configuration +func (c HostDecoratorConfig) Build(context operator.BuildContext) (operator.Operator, error) { + transformerOperator, err := c.TransformerConfig.Build(context) + if err != nil { + return nil, errors.Wrap(err, "failed to build transformer") + } + + op := &HostDecorator{ + TransformerOperator: transformerOperator, + includeHostname: c.IncludeHostname, + } + + if c.IncludeHostname { + op.hostname, err = os.Hostname() + if err != nil { + return nil, errors.Wrap(err, "get hostname") + } + } + + return op, nil +} + +// HostDecorator is an operator that can add host metadata to incoming entries +type HostDecorator struct { + helper.TransformerOperator + + hostname string + includeHostname bool +} + +// Process will process an incoming entry using the metadata transform. +func (h *HostDecorator) Process(ctx context.Context, entry *entry.Entry) error { + return h.ProcessWith(ctx, entry, h.Transform) +} + +// Transform will transform an entry, adding the configured host metadata. +func (h *HostDecorator) Transform(entry *entry.Entry) (*entry.Entry, error) { + if h.includeHostname { + entry.AddLabel("hostname", h.hostname) + } + + return entry, nil +} diff --git a/operator/builtin/transformer/host_decorator_test.go b/operator/builtin/transformer/host_decorator_test.go new file mode 100644 index 000000000..234c407c3 --- /dev/null +++ b/operator/builtin/transformer/host_decorator_test.go @@ -0,0 +1,120 @@ +package transformer + +import ( + "context" + "sync" + "testing" + + "github.com/observiq/carbon/entry" + "github.com/observiq/carbon/internal/testutil" + "github.com/observiq/carbon/operator" + "github.com/stretchr/testify/require" +) + +func TestHostDecorator(t *testing.T) { + + cases := []struct { + name string + hd *HostDecorator + expectedLabels map[string]string + }{ + { + "Default", + func() *HostDecorator { + op, err := NewHostDecoratorConfig("").Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + hd := op.(*HostDecorator) + hd.hostname = "test" + return hd + }(), + map[string]string{ + "hostname": "test", + }, + }, + { + "NoHostname", + func() *HostDecorator { + cfg := NewHostDecoratorConfig("") + cfg.IncludeHostname = false + op, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + hd := op.(*HostDecorator) + hd.hostname = "test" + return hd + }(), + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fake := testutil.NewFakeOutput(t) + tc.hd.OutputOperators = []operator.Operator{fake} + e := entry.New() + tc.hd.Process(context.Background(), e) + select { + case r := <-fake.Received: + require.Equal(t, tc.expectedLabels, r.Labels) + default: + require.FailNow(t, "Expected entry") + } + }) + } +} + +type hostDecoratorBenchmark struct { + name string + cfgMod func(*HostDecoratorConfig) +} + +func (g *hostDecoratorBenchmark) Run(b *testing.B) { + cfg := NewHostDecoratorConfig(g.name) + g.cfgMod(cfg) + op, err := cfg.Build(testutil.NewBuildContext(b)) + require.NoError(b, err) + + fake := testutil.NewFakeOutput(b) + op.(*HostDecorator).OutputOperators = []operator.Operator{fake} + + b.ResetTimer() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + e := entry.New() + op.Process(context.Background(), e) + } + err = op.Stop() + require.NoError(b, err) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + <-fake.Received + } + }() + + wg.Wait() +} + +func BenchmarkGoogleCloudOutput(b *testing.B) { + cases := []hostDecoratorBenchmark{ + { + "Default", + func(cfg *HostDecoratorConfig) {}, + }, + { + "NoHostname", + func(cfg *HostDecoratorConfig) { + cfg.IncludeHostname = false + }, + }, + } + + for _, tc := range cases { + b.Run(tc.name, tc.Run) + } +} From 030d62d47181216fc85c944682208e5909a08b04 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 10 Aug 2020 11:30:00 -0400 Subject: [PATCH 2/4] Rename to host metadata --- docs/README.md | 1 + docs/operators/host_metadata.md | 56 +++++++++++++++++++ .../{host_decorator.go => host_metadata.go} | 22 ++++---- ...ecorator_test.go => host_metadata_test.go} | 32 +++++------ 4 files changed, 84 insertions(+), 27 deletions(-) create mode 100644 docs/operators/host_metadata.md rename operator/builtin/transformer/{host_decorator.go => host_metadata.go} (65%) rename operator/builtin/transformer/{host_decorator_test.go => host_metadata_test.go} (72%) diff --git a/docs/README.md b/docs/README.md index 2850f1b37..021e38f1e 100644 --- a/docs/README.md +++ b/docs/README.md @@ -49,6 +49,7 @@ General purpose: - [Restructure records](/docs/operators/restructure.md) - [Router](/docs/operators/router.md) - [Kubernetes Metadata Decorator](/docs/operators/k8s_metadata_decorator.md) +- [Host Metadata](/docs/operators/host_metadata.md) - [Rate limit](/docs/operators/rate_limit.md) Or create your own [plugins](/docs/plugins.md) for a technology-specific use case. diff --git a/docs/operators/host_metadata.md b/docs/operators/host_metadata.md new file mode 100644 index 000000000..7128fe802 --- /dev/null +++ b/docs/operators/host_metadata.md @@ -0,0 +1,56 @@ +## `host_decorator` operator + +The `host_decorator` operator adds labels to incoming entries. + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `metadata` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `include_hostname` | `true` | Whether to set the `hostname` label on entries | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | + +### Example Configurations + +#### Add static tags and labels + +Configuration: +```yaml +- type: host_decorator + include_hostname: true +``` + + + + + + + +
Input entry Output entry
+ +```json +{ + "timestamp": "2020-06-15T11:15:50.475364-04:00", + "labels": {}, + "record": { + "message": "test" + } +} +``` + + + +```json +{ + "timestamp": "2020-06-15T11:15:50.475364-04:00", + "labels": { + "hostname": "my_host" + }, + "record": { + "message": "test" + } +} +``` + +
diff --git a/operator/builtin/transformer/host_decorator.go b/operator/builtin/transformer/host_metadata.go similarity index 65% rename from operator/builtin/transformer/host_decorator.go rename to operator/builtin/transformer/host_metadata.go index 7dfaae737..eb051b3c9 100644 --- a/operator/builtin/transformer/host_decorator.go +++ b/operator/builtin/transformer/host_metadata.go @@ -11,31 +11,31 @@ import ( ) func init() { - operator.Register("host_decorator", func() operator.Builder { return NewHostDecoratorConfig("") }) + operator.Register("host_metadata", func() operator.Builder { return NewHostMetadataConfig("") }) } -// NewHostDecoratorConfig returns a HostDecoratorConfig with default values -func NewHostDecoratorConfig(operatorID string) *HostDecoratorConfig { - return &HostDecoratorConfig{ +// NewHostMetadataConfig returns a HostMetadataConfig with default values +func NewHostMetadataConfig(operatorID string) *HostMetadataConfig { + return &HostMetadataConfig{ TransformerConfig: helper.NewTransformerConfig(operatorID, "host_decorator"), IncludeHostname: true, } } // -type HostDecoratorConfig struct { +type HostMetadataConfig struct { helper.TransformerConfig `yaml:",inline"` IncludeHostname bool `json:"include_hostname,omitempty" yaml:"include_hostname,omitempty"` } // Build will build an operator from the supplied configuration -func (c HostDecoratorConfig) Build(context operator.BuildContext) (operator.Operator, error) { +func (c HostMetadataConfig) Build(context operator.BuildContext) (operator.Operator, error) { transformerOperator, err := c.TransformerConfig.Build(context) if err != nil { return nil, errors.Wrap(err, "failed to build transformer") } - op := &HostDecorator{ + op := &HostMetadata{ TransformerOperator: transformerOperator, includeHostname: c.IncludeHostname, } @@ -50,8 +50,8 @@ func (c HostDecoratorConfig) Build(context operator.BuildContext) (operator.Oper return op, nil } -// HostDecorator is an operator that can add host metadata to incoming entries -type HostDecorator struct { +// HostMetadata is an operator that can add host metadata to incoming entries +type HostMetadata struct { helper.TransformerOperator hostname string @@ -59,12 +59,12 @@ type HostDecorator struct { } // Process will process an incoming entry using the metadata transform. -func (h *HostDecorator) Process(ctx context.Context, entry *entry.Entry) error { +func (h *HostMetadata) Process(ctx context.Context, entry *entry.Entry) error { return h.ProcessWith(ctx, entry, h.Transform) } // Transform will transform an entry, adding the configured host metadata. -func (h *HostDecorator) Transform(entry *entry.Entry) (*entry.Entry, error) { +func (h *HostMetadata) Transform(entry *entry.Entry) (*entry.Entry, error) { if h.includeHostname { entry.AddLabel("hostname", h.hostname) } diff --git a/operator/builtin/transformer/host_decorator_test.go b/operator/builtin/transformer/host_metadata_test.go similarity index 72% rename from operator/builtin/transformer/host_decorator_test.go rename to operator/builtin/transformer/host_metadata_test.go index 234c407c3..2561d583d 100644 --- a/operator/builtin/transformer/host_decorator_test.go +++ b/operator/builtin/transformer/host_metadata_test.go @@ -11,19 +11,19 @@ import ( "github.com/stretchr/testify/require" ) -func TestHostDecorator(t *testing.T) { +func TestHostMetadata(t *testing.T) { cases := []struct { name string - hd *HostDecorator + hd *HostMetadata expectedLabels map[string]string }{ { "Default", - func() *HostDecorator { - op, err := NewHostDecoratorConfig("").Build(testutil.NewBuildContext(t)) + func() *HostMetadata { + op, err := NewHostMetadataConfig("").Build(testutil.NewBuildContext(t)) require.NoError(t, err) - hd := op.(*HostDecorator) + hd := op.(*HostMetadata) hd.hostname = "test" return hd }(), @@ -33,12 +33,12 @@ func TestHostDecorator(t *testing.T) { }, { "NoHostname", - func() *HostDecorator { - cfg := NewHostDecoratorConfig("") + func() *HostMetadata { + cfg := NewHostMetadataConfig("") cfg.IncludeHostname = false op, err := cfg.Build(testutil.NewBuildContext(t)) require.NoError(t, err) - hd := op.(*HostDecorator) + hd := op.(*HostMetadata) hd.hostname = "test" return hd }(), @@ -62,19 +62,19 @@ func TestHostDecorator(t *testing.T) { } } -type hostDecoratorBenchmark struct { +type hostMetadataBenchmark struct { name string - cfgMod func(*HostDecoratorConfig) + cfgMod func(*HostMetadataConfig) } -func (g *hostDecoratorBenchmark) Run(b *testing.B) { - cfg := NewHostDecoratorConfig(g.name) +func (g *hostMetadataBenchmark) Run(b *testing.B) { + cfg := NewHostMetadataConfig(g.name) g.cfgMod(cfg) op, err := cfg.Build(testutil.NewBuildContext(b)) require.NoError(b, err) fake := testutil.NewFakeOutput(b) - op.(*HostDecorator).OutputOperators = []operator.Operator{fake} + op.(*HostMetadata).OutputOperators = []operator.Operator{fake} b.ResetTimer() var wg sync.WaitGroup @@ -101,14 +101,14 @@ func (g *hostDecoratorBenchmark) Run(b *testing.B) { } func BenchmarkGoogleCloudOutput(b *testing.B) { - cases := []hostDecoratorBenchmark{ + cases := []hostMetadataBenchmark{ { "Default", - func(cfg *HostDecoratorConfig) {}, + func(cfg *HostMetadataConfig) {}, }, { "NoHostname", - func(cfg *HostDecoratorConfig) { + func(cfg *HostMetadataConfig) { cfg.IncludeHostname = false }, }, From b1ad44c2ef7642dfc3a82fa83d3c47c05e718aed Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Mon, 10 Aug 2020 11:53:54 -0400 Subject: [PATCH 3/4] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3855421e7..fbed79679 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Fixed - Google Cloud Output failure when sent a field of type uint16 +### Added - Added a default function to plugin templates +- Add a host metadata operator that adds hostname to entries ## [0.9.7] - 2020-08-05 ### Changed From 149f66d5b32eba16a3cd8cb56d76599c455fcb72 Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Wed, 12 Aug 2020 12:08:36 -0400 Subject: [PATCH 4/4] Add IP address support --- docs/operators/host_metadata.md | 6 +- operator/builtin/transformer/host_metadata.go | 61 ++++++++++++++++++- .../builtin/transformer/host_metadata_test.go | 50 +++++++++------ 3 files changed, 93 insertions(+), 24 deletions(-) diff --git a/docs/operators/host_metadata.md b/docs/operators/host_metadata.md index 7128fe802..7dbee7423 100644 --- a/docs/operators/host_metadata.md +++ b/docs/operators/host_metadata.md @@ -1,6 +1,6 @@ -## `host_decorator` operator +## `host_metadata` operator -The `host_decorator` operator adds labels to incoming entries. +The `host_metadata` operator adds labels to incoming entries. ### Configuration Fields @@ -17,7 +17,7 @@ The `host_decorator` operator adds labels to incoming entries. Configuration: ```yaml -- type: host_decorator +- type: host_metadata include_hostname: true ``` diff --git a/operator/builtin/transformer/host_metadata.go b/operator/builtin/transformer/host_metadata.go index eb051b3c9..448b9eb57 100644 --- a/operator/builtin/transformer/host_metadata.go +++ b/operator/builtin/transformer/host_metadata.go @@ -2,6 +2,7 @@ package transformer import ( "context" + "net" "os" "github.com/observiq/carbon/entry" @@ -14,11 +15,15 @@ func init() { operator.Register("host_metadata", func() operator.Builder { return NewHostMetadataConfig("") }) } +// Variables that are overridable for testing +var hostname = os.Hostname + // NewHostMetadataConfig returns a HostMetadataConfig with default values func NewHostMetadataConfig(operatorID string) *HostMetadataConfig { return &HostMetadataConfig{ TransformerConfig: helper.NewTransformerConfig(operatorID, "host_decorator"), IncludeHostname: true, + IncludeIP: true, } } @@ -26,6 +31,7 @@ func NewHostMetadataConfig(operatorID string) *HostMetadataConfig { type HostMetadataConfig struct { helper.TransformerConfig `yaml:",inline"` IncludeHostname bool `json:"include_hostname,omitempty" yaml:"include_hostname,omitempty"` + IncludeIP bool `json:"include_ip,omitempty" yaml:"include_ip,omitempty"` } // Build will build an operator from the supplied configuration @@ -38,24 +44,73 @@ func (c HostMetadataConfig) Build(context operator.BuildContext) (operator.Opera op := &HostMetadata{ TransformerOperator: transformerOperator, includeHostname: c.IncludeHostname, + includeIP: c.IncludeIP, } if c.IncludeHostname { - op.hostname, err = os.Hostname() + op.hostname, err = hostname() if err != nil { return nil, errors.Wrap(err, "get hostname") } } + if c.IncludeIP { + ip, err := getIP() + if err != nil { + return nil, errors.Wrap(err, "get ip address") + } + op.ip = ip + } + return op, nil } +func getIP() (string, error) { + var ip string + + ifaces, err := net.Interfaces() + if err != nil { + return "", errors.Wrap(err, "list interfaces") + } + + for _, iface := range ifaces { + // Skip loopback interfaces + if iface.Flags&net.FlagLoopback != 0 { + continue + } + + // Skip down interfaces + if iface.Flags&net.FlagUp == 0 { + continue + } + + addrs, err := iface.Addrs() + if err != nil { + continue + } + if len(addrs) > 0 { + ip = addrs[0].String() + } + } + + if len(ip) == 0 { + return "", errors.NewError( + "failed to find ip address", + "check that a non-loopback interface with an assigned IP address exists and is running", + ) + } + + return ip, nil +} + // HostMetadata is an operator that can add host metadata to incoming entries type HostMetadata struct { helper.TransformerOperator hostname string + ip string includeHostname bool + includeIP bool } // Process will process an incoming entry using the metadata transform. @@ -69,5 +124,9 @@ func (h *HostMetadata) Transform(entry *entry.Entry) (*entry.Entry, error) { entry.AddLabel("hostname", h.hostname) } + if h.includeIP { + entry.AddLabel("ip", h.ip) + } + return entry, nil } diff --git a/operator/builtin/transformer/host_metadata_test.go b/operator/builtin/transformer/host_metadata_test.go index 2561d583d..b2874aad1 100644 --- a/operator/builtin/transformer/host_metadata_test.go +++ b/operator/builtin/transformer/host_metadata_test.go @@ -2,6 +2,7 @@ package transformer import ( "context" + "os" "sync" "testing" @@ -11,47 +12,56 @@ import ( "github.com/stretchr/testify/require" ) -func TestHostMetadata(t *testing.T) { +func testHostname() (string, error) { + return "test", nil +} +func TestHostMetadata(t *testing.T) { cases := []struct { name string - hd *HostMetadata + modifyConfig func(*HostMetadataConfig) expectedLabels map[string]string + fakeHostname func() (string, error) }{ { "Default", - func() *HostMetadata { - op, err := NewHostMetadataConfig("").Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - hd := op.(*HostMetadata) - hd.hostname = "test" - return hd - }(), + func(cfg *HostMetadataConfig) { + cfg.IncludeIP = false + }, map[string]string{ "hostname": "test", }, + testHostname, }, { "NoHostname", - func() *HostMetadata { - cfg := NewHostMetadataConfig("") + func(cfg *HostMetadataConfig) { cfg.IncludeHostname = false - op, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - hd := op.(*HostMetadata) - hd.hostname = "test" - return hd - }(), + cfg.IncludeIP = false + }, nil, + testHostname, }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + hostname = tc.fakeHostname + defer func() { hostname = os.Hostname }() + + cfg := NewHostMetadataConfig("test_id") + cfg.OutputIDs = []string{"fake"} + tc.modifyConfig(cfg) + + op, err := cfg.Build(testutil.NewBuildContext(t)) + require.NoError(t, err) + fake := testutil.NewFakeOutput(t) - tc.hd.OutputOperators = []operator.Operator{fake} + err = op.SetOutputs([]operator.Operator{fake}) + require.NoError(t, err) + e := entry.New() - tc.hd.Process(context.Background(), e) + op.Process(context.Background(), e) select { case r := <-fake.Received: require.Equal(t, tc.expectedLabels, r.Labels) @@ -100,7 +110,7 @@ func (g *hostMetadataBenchmark) Run(b *testing.B) { wg.Wait() } -func BenchmarkGoogleCloudOutput(b *testing.B) { +func BenchmarkHostMetadata(b *testing.B) { cases := []hostMetadataBenchmark{ { "Default",