From d47c55a676f44481ccf04b731b611b6c8ae9a2b1 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Mon, 1 May 2017 03:08:41 -0700 Subject: [PATCH] Make kubernetes indexers/matchers pluggable (#4151) --- CHANGELOG.asciidoc | 1 + libbeat/processors/kubernetes/indexing.go | 10 ++-- libbeat/processors/kubernetes/registry.go | 63 +++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 libbeat/processors/kubernetes/registry.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7ccb3ae4365..779122860cc 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Add support for include_labels and include_annotations in kubernetes processor {pull}4043[4043] - Support new `index_patterns` field when loading templates for Elasticsearch >= 6.0 {pull}4056[4056] - Adding goimports support to make check and fmt {pull}4114[4114] +- Make kubernetes indexers/matchers pluggable {pull}4151[4151] *Filebeat* diff --git a/libbeat/processors/kubernetes/indexing.go b/libbeat/processors/kubernetes/indexing.go index b296146911e..c53408cfec6 100644 --- a/libbeat/processors/kubernetes/indexing.go +++ b/libbeat/processors/kubernetes/indexing.go @@ -67,20 +67,20 @@ type Matchers struct { // Register contains Indexer and Matchers to use on pod indexing and event matching type Register struct { sync.RWMutex - indexers map[string]IndexConstructor + indexers map[string]IndexerConstructor matchers map[string]MatcherConstructor defaultIndexerConfigs map[string]common.Config defaultMatcherConfigs map[string]common.Config } -type IndexConstructor func(config common.Config, genMeta GenMeta) (Indexer, error) +type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error) type MatcherConstructor func(config common.Config) (Matcher, error) // NewRegister creates and returns a new Register. func NewRegister() *Register { return &Register{ - indexers: make(map[string]IndexConstructor, 0), + indexers: make(map[string]IndexerConstructor, 0), matchers: make(map[string]MatcherConstructor, 0), defaultIndexerConfigs: make(map[string]common.Config, 0), @@ -89,7 +89,7 @@ func NewRegister() *Register { } // AddIndexer to the register -func (r *Register) AddIndexer(name string, indexer IndexConstructor) { +func (r *Register) AddIndexer(name string, indexer IndexerConstructor) { r.RWMutex.Lock() defer r.RWMutex.Unlock() r.indexers[name] = indexer @@ -113,7 +113,7 @@ func (r *Register) AddDefaultMatcherConfig(name string, config common.Config) { } // AddIndexer to the register -func (r *Register) GetIndexer(name string) IndexConstructor { +func (r *Register) GetIndexer(name string) IndexerConstructor { indexer, ok := r.indexers[name] if ok { return indexer diff --git a/libbeat/processors/kubernetes/registry.go b/libbeat/processors/kubernetes/registry.go new file mode 100644 index 00000000000..463db4eb910 --- /dev/null +++ b/libbeat/processors/kubernetes/registry.go @@ -0,0 +1,63 @@ +package kubernetes + +import ( + "errors" + "fmt" + + p "github.com/elastic/beats/libbeat/plugin" +) + +var ( + indexerKey = "libbeat.processor.kubernetes.indexer" + matcherKey = "libbeat.processor.kubernetes.matcher" +) + +type indexerPlugin struct { + name string + constructor IndexerConstructor +} + +func IndexerPlugin(name string, c IndexerConstructor) map[string][]interface{} { + return p.MakePlugin(indexerKey, indexerPlugin{name, c}) +} + +type matcherPlugin struct { + name string + constructor MatcherConstructor +} + +func MatcherPlugin(name string, m MatcherConstructor) map[string][]interface{} { + return p.MakePlugin(matcherKey, matcherPlugin{name, m}) +} + +func init() { + p.MustRegisterLoader(indexerKey, func(ifc interface{}) error { + i, ok := ifc.(indexerPlugin) + if !ok { + return errors.New("plugin does not match output plugin type") + } + + name := i.name + if Indexing.indexers[name] != nil { + return fmt.Errorf("indexer type %v already registered", name) + } + + Indexing.AddIndexer(name, i.constructor) + return nil + }) + + p.MustRegisterLoader(matcherKey, func(ifc interface{}) error { + m, ok := ifc.(matcherPlugin) + if !ok { + return errors.New("plugin does not match output plugin type") + } + + name := m.name + if Indexing.indexers[name] != nil { + return fmt.Errorf("matcher type %v already registered", name) + } + + Indexing.AddMatcher(name, m.constructor) + return nil + }) +}