Skip to content

Commit

Permalink
Experimental go1.8 plugin support (#3217)
Browse files Browse the repository at this point in the history
- requires linux, go1.8 and cgo
- plugins export a `Bundle` symbol of type `map[string][]interface{}`.
  The `key` is special to every pluggable module in beats
- all pluggable beats modules provide a `PluginX` function to create a valid
  bundle
- using `plugin.Bundle`, multiple bundles can be combined into one. A bundle
  can contain plugins for different beats and be loaded by all these different
  beats.
- plugins can be loaded via CLI only using the `-plugin <path>` flag
- add plugin support for:
  - libbeat outputs
  - libbeat processors
  - packetbeat protocol analyzers
  - metricbeat modules
  - heartbeat monitors

See PR #3217 for more details
  • Loading branch information
Steffen Siering authored and ruflin committed Jan 13, 2017
1 parent 15991e4 commit 760290f
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 0 deletions.
30 changes: 30 additions & 0 deletions heartbeat/monitors/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package monitors

import (
"errors"

"github.com/elastic/beats/libbeat/plugin"
)

type monitorPlugin struct {
name string
typ Type
builder ActiveBuilder
}

var pluginKey = "heartbeat.monitor"

func ActivePlugin(name string, b ActiveBuilder) map[string][]interface{} {
return plugin.MakePlugin(pluginKey, monitorPlugin{name, ActiveMonitor, b})
}

func init() {
plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error {
p, ok := ifc.(monitorPlugin)
if !ok {
return errors.New("plugin does not match monitor plugin type")
}

return Registry.Register(p.name, p.typ, p.builder)
})
}
5 changes: 5 additions & 0 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
svc "github.com/elastic/beats/libbeat/service"
Expand Down Expand Up @@ -155,6 +156,10 @@ func (b *Beat) launch(bt Creator) error {
return err
}

if err := plugin.Initialize(); err != nil {
return err
}

svc.BeforeRun()
defer svc.Cleanup()

Expand Down
38 changes: 38 additions & 0 deletions libbeat/outputs/codecs/codecs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package codecs

import (
"errors"
"fmt"

"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/plugin"
)

type codecPlugin struct {
name string
factory outputs.CodecFactory
}

var pluginKey = "libbeat.output.codec"

func Plugin(name string, f outputs.CodecFactory) map[string][]interface{} {
return plugin.MakePlugin(name, codecPlugin{name, f})
}

func init() {
plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) (err error) {
b, ok := ifc.(codecPlugin)
if !ok {
return errors.New("plugin does not match output codec plugin type")
}

defer func() {
if msg := recover(); msg != nil {
err = fmt.Errorf("%s", msg)
}
}()

outputs.RegisterOutputCodec(b.name, b.factory)
return
})
}
36 changes: 36 additions & 0 deletions libbeat/outputs/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package outputs

import (
"errors"
"fmt"

p "github.com/elastic/beats/libbeat/plugin"
)

type outputPlugin struct {
name string
builder OutputBuilder
}

var pluginKey = "libbeat.output"

func Plugin(name string, l OutputBuilder) map[string][]interface{} {
return p.MakePlugin(pluginKey, outputPlugin{name, l})
}

func init() {
p.MustRegisterLoader(pluginKey, func(ifc interface{}) error {
b, ok := ifc.(outputPlugin)
if !ok {
return errors.New("plugin does not match output plugin type")
}

name := b.name
if outputsPlugins[name] != nil {
return fmt.Errorf("output type %v already registered", name)
}

RegisterOutputPlugin(name, b.builder)
return nil
})
}
47 changes: 47 additions & 0 deletions libbeat/plugin/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//+build linux,go1.8,cgo

package plugin

import (
"flag"
"strings"

"github.com/elastic/beats/libbeat/logp"
)

type pluginList struct {
paths []string
}

func (p *pluginList) String() string {
return strings.Join(p.paths, ",")
}

func (p *pluginList) Set(v string) error {
// TODO: check file exists

p.paths = append(p.paths, v)
return nil
}

var plugins = &pluginList{}

func init() {
flag.Var(plugins, "plugin", "Load additional plugins")
}

func Initialize() error {
if len(plugins.paths) > 0 {
logp.Warn("EXPERIMENTAL: loadable plugin support is experimental")
}

for _, path := range plugins.paths {
logp.Info("loading plugin bundle: %v", path)

if err := LoadPlugins(path); err != nil {
return err
}
}

return nil
}
7 changes: 7 additions & 0 deletions libbeat/plugin/cli_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//+build !linux !go1.8 !cgo

package plugin

func Initialize() error {
return nil
}
41 changes: 41 additions & 0 deletions libbeat/plugin/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//+build linux,go1.8,cgo

package plugin

import (
"errors"
goplugin "plugin"
)

func loadPlugins(path string) error {
p, err := goplugin.Open(path)
if err != nil {
return err
}

sym, err := p.Lookup("Bundle")
if err != nil {
return err
}

ptr, ok := sym.(*map[string][]interface{})
if !ok {
return errors.New("invalid bundle type")
}

bundle := *ptr
for name, plugins := range bundle {
loader := registry[name]
if loader == nil {
continue
}

for _, plugin := range plugins {
if err := loader(plugin); err != nil {
return err
}
}
}

return nil
}
11 changes: 11 additions & 0 deletions libbeat/plugin/load_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//+build !linux !go1.8 !cgo

package plugin

import "errors"

var errNotSupported = errors.New("plugins not supported")

func loadPlugins(path string) error {
return errNotSupported
}
48 changes: 48 additions & 0 deletions libbeat/plugin/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package plugin

import "fmt"

type PluginLoader func(p interface{}) error

var registry = map[string]PluginLoader{}

func Bundle(
bundles ...map[string][]interface{},
) map[string][]interface{} {
ret := map[string][]interface{}{}

for _, bundle := range bundles {
for name, plugins := range bundle {
ret[name] = append(ret[name], plugins...)
}
}

return ret
}

func MakePlugin(key string, ifc interface{}) map[string][]interface{} {
return map[string][]interface{}{
key: {ifc},
}
}

func MustRegisterLoader(name string, l PluginLoader) {
err := RegisterLoader(name, l)
if err != nil {
panic(err)
}
}

func RegisterLoader(name string, l PluginLoader) error {
if l := registry[name]; l != nil {
return fmt.Errorf("plugin loader '%v' already registered", name)
}

registry[name] = l
return nil
}

func LoadPlugins(path string) error {
// TODO: add flag to enable/disable plugins?
return loadPlugins(path)
}
25 changes: 25 additions & 0 deletions libbeat/processors/registry.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,35 @@
package processors

import (
"errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
p "github.com/elastic/beats/libbeat/plugin"
)

type processorPlugin struct {
name string
constr Constructor
}

var pluginKey = "libbeat.processor"

func Plugin(name string, c Constructor) map[string][]interface{} {
return p.MakePlugin(pluginKey, processorPlugin{name, c})
}

func init() {
p.MustRegisterLoader(pluginKey, func(ifc interface{}) error {
p, ok := ifc.(processorPlugin)
if !ok {
return errors.New("plugin does not match processor plugin type")
}

return registry.Register(p.name, p.constr)
})
}

type Processor interface {
Run(event common.MapStr) (common.MapStr, error)
String() string
Expand Down
55 changes: 55 additions & 0 deletions metricbeat/module/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package module

import (
"errors"

"github.com/elastic/beats/libbeat/plugin"

"github.com/elastic/beats/metricbeat/mb"
)

type modulePlugin struct {
name string
factory mb.ModuleFactory
metricsets map[string]mb.MetricSetFactory
}

const pluginKey = "metricbeat.module"

func init() {
plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error {
p, ok := ifc.(modulePlugin)
if !ok {
return errors.New("plugin does not match metricbeat module plugin type")
}

if p.factory != nil {
if err := mb.Registry.AddModule(p.name, p.factory); err != nil {
return err
}
}

for name, factory := range p.metricsets {
if err := mb.Registry.AddMetricSet(p.name, name, factory); err != nil {
return err
}
}

return nil
})
}

func Plugin(
module string,
factory mb.ModuleFactory,
metricsets map[string]mb.MetricSetFactory,
) map[string][]interface{} {
return plugin.MakePlugin(pluginKey, modulePlugin{module, factory, metricsets})
}

func MetricSetsPlugin(
module string,
metricsets map[string]mb.MetricSetFactory,
) map[string][]interface{} {
return Plugin(module, nil, metricsets)
}
Loading

0 comments on commit 760290f

Please sign in to comment.