Skip to content
This repository has been archived by the owner on Sep 20, 2023. It is now read-only.

Commit

Permalink
Move basic input architecture from Filebeat (#17)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR moves the input management v2 from Filebeat. Each input is descirbed by a `Plugin`. The plugin contains the `InputManager` that is able to create an `Input`, provide access to the statestore, etc. I have moved two input managers, `input-stateless` for stateless inputs and `input-cursor` for inputs with position tracking. `filestream` uses a custom `InputManager` that lets you cancel a running input and pass its state information to a new input. That special input is coming with `filestream`.

Example for a `stateless` input:
```golang
 func Plugin() input.Plugin {
     return input.Plugin{
         Name: "myservice",
         Stability: feature.Stable,
         Deprecated: false,
         Info: "collect data from myservice",
         Manager: stateless.NewInputManager(configure),
     }
 }
```

An `Input` must fulfill the following interface:
```golang
// Input is a configured input object that can be used to test or start
// the actual data collection.
type Input interface {
    // Name reports the input name.
    Name() string

    // Test checks the configuaration and runs additional checks if the Input can
    // actually collect data for the given configuration (e.g. check if host/port or files are
    // accessible).
    Test(TestContext) error

    // Run starts the data collection. Run must return an error only if the
    // error is fatal making it impossible for the input to recover.
    Run(Context, publisher.PipelineConnector) error
}
```

You can load input plugins using the `Loader`. With the example above you can load `filestream` and `journald` input plugins. `type` refers to the name of the configuration that tells you what the input is and "filestream" is the default input, if the type is missing.
```golang
plugins := []Plugin{
    filestream.Plugin,
    journald.Plugin,
}
log := logp.NewLogger("inputs")
input.NewLoader(log, plugins, "type", "filestream") (*Loader, error)
```

The code under `pkg/publisher` is coming from Beats. It should be reworked so we can publish events using the shipper. I moved the code here temporarily to make sure tests pass.

Please review commits from 0a79947. The ones before that are just adding new files from beats.

## Why is it important?

Provide the basic architecture so we can start implementing and moving inputs.
  • Loading branch information
kvch authored Jun 7, 2022
1 parent 6b72054 commit 69feca1
Show file tree
Hide file tree
Showing 123 changed files with 12,368 additions and 1,340 deletions.
37 changes: 18 additions & 19 deletions .ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,24 @@ pipeline {
// }
// }
//}
// There is nothing to test. Step must be enabled when we add code.
//stage('Test') {
// steps {
// withGithubNotify(context: "Test") {
// deleteDir()
// unstash 'source'
// dir("${BASE_DIR}"){
// withGoEnv(){
// goTestJUnit(options: '-v ./...', output: 'junit-report.xml')
// }
// }
// }
// }
// post {
// always {
// junit(allowEmptyResults: true, keepLongStdio: true, testResults: '**/junit-report.xml')
// }
// }
//}
stage('Test') {
steps {
withGithubNotify(context: "Test") {
deleteDir()
unstash 'source'
dir("${BASE_DIR}"){
withGoEnv(){
goTestJUnit(options: '-v ./...', output: 'junit-report.xml')
}
}
}
}
post {
always {
junit(allowEmptyResults: true, keepLongStdio: true, testResults: '**/junit-report.xml')
}
}
}
}
post {
cleanup {
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ linters-settings:
# minimal length of string constant, 3 by default
min-len: 3
# minimal occurrences count to trigger, 3 by default
min-occurrences: 2
min-occurrences: 5

dupl:
# tokens count to trigger issue, 150 by default
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG-developer.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

### Added

- Move initial version of input package from Filebeat. #17

### Changed

### Deprecated
Expand Down
3,648 changes: 2,364 additions & 1,284 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dev-tools/templates/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ linters-settings:
# minimal length of string constant, 3 by default
min-len: 3
# minimal occurrences count to trigger, 3 by default
min-occurrences: 2
min-occurrences: 5

dupl:
# tokens count to trigger issue, 150 by default
Expand Down
22 changes: 19 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,39 @@ go 1.17

require (
github.com/elastic/elastic-agent-libs v0.2.4
github.com/elastic/go-concert v0.2.0
github.com/elastic/go-licenser v0.4.0
github.com/elastic/go-structform v0.0.9
github.com/gofrs/uuid v3.3.0+incompatible
github.com/google/go-cmp v0.5.6
github.com/magefile/mage v1.13.0
github.com/stretchr/testify v1.7.0
github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71
go.elastic.co/go-licence-detector v0.5.0
go.uber.org/atomic v1.9.0
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
)

require (
github.com/cyphar/filepath-securejoin v0.2.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elastic/go-ucfg v0.8.5 // indirect
github.com/gobuffalo/here v0.6.0 // indirect
github.com/google/licenseclassifier v0.0.0-20200402202327-879cb1424de0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/karrick/godirwalk v1.15.6 // indirect
github.com/markbates/pkger v0.17.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 // indirect
go.elastic.co/ecszap v1.0.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/mod v0.5.1 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
51 changes: 20 additions & 31 deletions go.sum

Large diffs are not rendered by default.

79 changes: 79 additions & 0 deletions pkg/feature/bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package feature

// FilterFunc is the function use to filter elements from a bundle.
type FilterFunc = func(Featurable) bool

// Bundleable merges featurable and bundle interface together.
type bundleable interface {
Features() []Featurable
}

// Bundle defines a list of features available in the current beat.
type Bundle struct {
features []Featurable
}

// NewBundle creates a new Bundle of feature to be registered.
func NewBundle(features ...Featurable) *Bundle {
return &Bundle{features: features}
}

// FilterWith takes a predicate and return a list of filtered bundle matching the predicate.
func (b *Bundle) FilterWith(pred FilterFunc) *Bundle {
var filtered []Featurable

for _, feature := range b.features {
if pred(feature) {
filtered = append(filtered, feature)
}
}
return NewBundle(filtered...)
}

// Filter creates a new bundle with only the feature matching the requested stability.
func (b *Bundle) Filter(stabilities ...Stability) *Bundle {
return b.FilterWith(HasStabilityPred(stabilities...))
}

// Features returns the interface features slice so
func (b *Bundle) Features() []Featurable {
return b.features
}

// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features.
func MustBundle(bundle ...bundleable) *Bundle {
var merged []Featurable
for _, feature := range bundle {
merged = append(merged, feature.Features()...)
}
return NewBundle(merged...)
}

// HasStabilityPred returns true if the feature match any of the provided stabilities.
func HasStabilityPred(stabilities ...Stability) FilterFunc {
return func(f Featurable) bool {
for _, s := range stabilities {
if s == f.Description().Stability {
return true
}
}
return false
}
}
71 changes: 71 additions & 0 deletions pkg/feature/bundle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package feature

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBundle(t *testing.T) {
factory := func() {}
features := []Featurable{
New("libbeat.outputs", "elasticsearch", factory, Details{Stability: Stable}),
New("libbeat.outputs", "edge", factory, Details{Stability: Experimental}),
New("libbeat.input", "tcp", factory, Details{Stability: Beta}),
}

t.Run("Creates a new Bundle", func(t *testing.T) {
b := NewBundle(features...)
assert.Equal(t, 3, len(b.Features()))
})

t.Run("Filters feature based on Stability", func(t *testing.T) {
b := NewBundle(features...)
new := b.Filter(Experimental)
assert.Equal(t, 1, len(new.Features()))
})

t.Run("Filters feature based on multiple different Stability", func(t *testing.T) {
b := NewBundle(features...)
new := b.Filter(Experimental, Stable)
assert.Equal(t, 2, len(new.Features()))
})

t.Run("Creates a new Bundle from specified feature", func(t *testing.T) {
f1 := New("libbeat.outputs", "elasticsearch", factory, Details{Stability: Stable})
b := MustBundle(f1)
assert.Equal(t, 1, len(b.Features()))
})

t.Run("Creates a new Bundle with grouped features", func(t *testing.T) {
f1 := New("libbeat.outputs", "elasticsearch", factory, Details{Stability: Stable})
f2 := New("libbeat.outputs", "edge", factory, Details{Stability: Experimental})
f3 := New("libbeat.input", "tcp", factory, Details{Stability: Beta})
f4 := New("libbeat.input", "udp", factory, Details{Stability: Beta})

b := MustBundle(
MustBundle(f1),
MustBundle(f2),
MustBundle(f3, f4),
)

assert.Equal(t, 4, len(b.Features()))
})
}
42 changes: 42 additions & 0 deletions pkg/feature/details.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package feature

import "fmt"

// Details minimal information that you must provide when creating a feature.
type Details struct {
Name string
Stability Stability
Deprecated bool
Info string // short info string
Doc string // long doc string
}

func (d Details) String() string {
fmtStr := "name: %s, description: %s (%s)"
if d.Deprecated {
fmtStr = "name: %s, description: %s (deprecated, %s)"
}
return fmt.Sprintf(fmtStr, d.Name, d.Info, d.Stability)
}

// MakeDetails return the minimal information a new feature must provide.
func MakeDetails(fullName string, doc string, stability Stability) Details {
return Details{Name: fullName, Info: doc, Stability: stability}
}
Loading

0 comments on commit 69feca1

Please sign in to comment.