diff --git a/heartbeat/monitors/plugin.go b/heartbeat/monitors/plugin.go new file mode 100644 index 00000000000..6f6c3890d31 --- /dev/null +++ b/heartbeat/monitors/plugin.go @@ -0,0 +1,30 @@ +package monitors + +import ( + "errors" + + "github.com/elastic/beats/libbeat/plugin" +) + +type monitorPlugin struct { + name string + typ Type + builder ActiveBuilder +} + +var pluginKey = "heartbeat.monitor" + +func ActivePlugin(name string, b ActiveBuilder) map[string][]interface{} { + return plugin.MakePlugin(pluginKey, monitorPlugin{name, ActiveMonitor, b}) +} + +func init() { + plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { + p, ok := ifc.(monitorPlugin) + if !ok { + return errors.New("plugin does not match monitor plugin type") + } + + return Registry.Register(p.name, p.typ, p.builder) + }) +} diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index b50bf5bda28..13b0f2b7b36 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -46,6 +46,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" + "github.com/elastic/beats/libbeat/plugin" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" svc "github.com/elastic/beats/libbeat/service" @@ -155,6 +156,10 @@ func (b *Beat) launch(bt Creator) error { return err } + if err := plugin.Initialize(); err != nil { + return err + } + svc.BeforeRun() defer svc.Cleanup() diff --git a/libbeat/outputs/codecs/codecs.go b/libbeat/outputs/codecs/codecs.go new file mode 100644 index 00000000000..2d698b2c179 --- /dev/null +++ b/libbeat/outputs/codecs/codecs.go @@ -0,0 +1,38 @@ +package codecs + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/plugin" +) + +type codecPlugin struct { + name string + factory outputs.CodecFactory +} + +var pluginKey = "libbeat.output.codec" + +func Plugin(name string, f outputs.CodecFactory) map[string][]interface{} { + return plugin.MakePlugin(name, codecPlugin{name, f}) +} + +func init() { + plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) (err error) { + b, ok := ifc.(codecPlugin) + if !ok { + return errors.New("plugin does not match output codec plugin type") + } + + defer func() { + if msg := recover(); msg != nil { + err = fmt.Errorf("%s", msg) + } + }() + + outputs.RegisterOutputCodec(b.name, b.factory) + return + }) +} diff --git a/libbeat/outputs/plugin.go b/libbeat/outputs/plugin.go new file mode 100644 index 00000000000..cef87cebfef --- /dev/null +++ b/libbeat/outputs/plugin.go @@ -0,0 +1,36 @@ +package outputs + +import ( + "errors" + "fmt" + + p "github.com/elastic/beats/libbeat/plugin" +) + +type outputPlugin struct { + name string + builder OutputBuilder +} + +var pluginKey = "libbeat.output" + +func Plugin(name string, l OutputBuilder) map[string][]interface{} { + return p.MakePlugin(pluginKey, outputPlugin{name, l}) +} + +func init() { + p.MustRegisterLoader(pluginKey, func(ifc interface{}) error { + b, ok := ifc.(outputPlugin) + if !ok { + return errors.New("plugin does not match output plugin type") + } + + name := b.name + if outputsPlugins[name] != nil { + return fmt.Errorf("output type %v already registered", name) + } + + RegisterOutputPlugin(name, b.builder) + return nil + }) +} diff --git a/libbeat/plugin/cli.go b/libbeat/plugin/cli.go new file mode 100644 index 00000000000..5849c48e923 --- /dev/null +++ b/libbeat/plugin/cli.go @@ -0,0 +1,47 @@ +//+build linux,go1.8,cgo + +package plugin + +import ( + "flag" + "strings" + + "github.com/elastic/beats/libbeat/logp" +) + +type pluginList struct { + paths []string +} + +func (p *pluginList) String() string { + return strings.Join(p.paths, ",") +} + +func (p *pluginList) Set(v string) error { + // TODO: check file exists + + p.paths = append(p.paths, v) + return nil +} + +var plugins = &pluginList{} + +func init() { + flag.Var(plugins, "plugin", "Load additional plugins") +} + +func Initialize() error { + if len(plugins.paths) > 0 { + logp.Warn("EXPERIMENTAL: loadable plugin support is experimental") + } + + for _, path := range plugins.paths { + logp.Info("loading plugin bundle: %v", path) + + if err := LoadPlugins(path); err != nil { + return err + } + } + + return nil +} diff --git a/libbeat/plugin/cli_stub.go b/libbeat/plugin/cli_stub.go new file mode 100644 index 00000000000..4253aae393c --- /dev/null +++ b/libbeat/plugin/cli_stub.go @@ -0,0 +1,7 @@ +//+build !linux !go1.8 !cgo + +package plugin + +func Initialize() error { + return nil +} diff --git a/libbeat/plugin/load.go b/libbeat/plugin/load.go new file mode 100644 index 00000000000..dd6f839d260 --- /dev/null +++ b/libbeat/plugin/load.go @@ -0,0 +1,41 @@ +//+build linux,go1.8,cgo + +package plugin + +import ( + "errors" + goplugin "plugin" +) + +func loadPlugins(path string) error { + p, err := goplugin.Open(path) + if err != nil { + return err + } + + sym, err := p.Lookup("Bundle") + if err != nil { + return err + } + + ptr, ok := sym.(*map[string][]interface{}) + if !ok { + return errors.New("invalid bundle type") + } + + bundle := *ptr + for name, plugins := range bundle { + loader := registry[name] + if loader == nil { + continue + } + + for _, plugin := range plugins { + if err := loader(plugin); err != nil { + return err + } + } + } + + return nil +} diff --git a/libbeat/plugin/load_stub.go b/libbeat/plugin/load_stub.go new file mode 100644 index 00000000000..b369aa16b00 --- /dev/null +++ b/libbeat/plugin/load_stub.go @@ -0,0 +1,11 @@ +//+build !linux !go1.8 !cgo + +package plugin + +import "errors" + +var errNotSupported = errors.New("plugins not supported") + +func loadPlugins(path string) error { + return errNotSupported +} diff --git a/libbeat/plugin/plugin.go b/libbeat/plugin/plugin.go new file mode 100644 index 00000000000..8454c96eaa4 --- /dev/null +++ b/libbeat/plugin/plugin.go @@ -0,0 +1,48 @@ +package plugin + +import "fmt" + +type PluginLoader func(p interface{}) error + +var registry = map[string]PluginLoader{} + +func Bundle( + bundles ...map[string][]interface{}, +) map[string][]interface{} { + ret := map[string][]interface{}{} + + for _, bundle := range bundles { + for name, plugins := range bundle { + ret[name] = append(ret[name], plugins...) + } + } + + return ret +} + +func MakePlugin(key string, ifc interface{}) map[string][]interface{} { + return map[string][]interface{}{ + key: {ifc}, + } +} + +func MustRegisterLoader(name string, l PluginLoader) { + err := RegisterLoader(name, l) + if err != nil { + panic(err) + } +} + +func RegisterLoader(name string, l PluginLoader) error { + if l := registry[name]; l != nil { + return fmt.Errorf("plugin loader '%v' already registered", name) + } + + registry[name] = l + return nil +} + +func LoadPlugins(path string) error { + // TODO: add flag to enable/disable plugins? + return loadPlugins(path) +} diff --git a/libbeat/processors/registry.go b/libbeat/processors/registry.go index 5d0a1d740f4..04e48373bda 100644 --- a/libbeat/processors/registry.go +++ b/libbeat/processors/registry.go @@ -1,10 +1,35 @@ package processors import ( + "errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + p "github.com/elastic/beats/libbeat/plugin" ) +type processorPlugin struct { + name string + constr Constructor +} + +var pluginKey = "libbeat.processor" + +func Plugin(name string, c Constructor) map[string][]interface{} { + return p.MakePlugin(pluginKey, processorPlugin{name, c}) +} + +func init() { + p.MustRegisterLoader(pluginKey, func(ifc interface{}) error { + p, ok := ifc.(processorPlugin) + if !ok { + return errors.New("plugin does not match processor plugin type") + } + + return registry.Register(p.name, p.constr) + }) +} + type Processor interface { Run(event common.MapStr) (common.MapStr, error) String() string diff --git a/metricbeat/module/plugin.go b/metricbeat/module/plugin.go new file mode 100644 index 00000000000..2d314b54a19 --- /dev/null +++ b/metricbeat/module/plugin.go @@ -0,0 +1,55 @@ +package module + +import ( + "errors" + + "github.com/elastic/beats/libbeat/plugin" + + "github.com/elastic/beats/metricbeat/mb" +) + +type modulePlugin struct { + name string + factory mb.ModuleFactory + metricsets map[string]mb.MetricSetFactory +} + +const pluginKey = "metricbeat.module" + +func init() { + plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { + p, ok := ifc.(modulePlugin) + if !ok { + return errors.New("plugin does not match metricbeat module plugin type") + } + + if p.factory != nil { + if err := mb.Registry.AddModule(p.name, p.factory); err != nil { + return err + } + } + + for name, factory := range p.metricsets { + if err := mb.Registry.AddMetricSet(p.name, name, factory); err != nil { + return err + } + } + + return nil + }) +} + +func Plugin( + module string, + factory mb.ModuleFactory, + metricsets map[string]mb.MetricSetFactory, +) map[string][]interface{} { + return plugin.MakePlugin(pluginKey, modulePlugin{module, factory, metricsets}) +} + +func MetricSetsPlugin( + module string, + metricsets map[string]mb.MetricSetFactory, +) map[string][]interface{} { + return Plugin(module, nil, metricsets) +} diff --git a/packetbeat/protocols/plugin.go b/packetbeat/protocols/plugin.go new file mode 100644 index 00000000000..f2596330953 --- /dev/null +++ b/packetbeat/protocols/plugin.go @@ -0,0 +1,31 @@ +package protocols + +import ( + "errors" + + "github.com/elastic/beats/libbeat/plugin" + "github.com/elastic/beats/packetbeat/protos" +) + +type protocolPlugin struct { + name string + p protos.ProtocolPlugin +} + +const pluginKey = "packetbeat.protocol" + +func init() { + plugin.MustRegisterLoader(pluginKey, func(ifc interface{}) error { + p, ok := ifc.(protocolPlugin) + if !ok { + return errors.New("plugin does not match protocol plugin type") + } + + protos.Register(p.name, p.p) + return nil + }) +} + +func Plugin(name string, p protos.ProtocolPlugin) map[string][]interface{} { + return plugin.MakePlugin(pluginKey, protocolPlugin{name, p}) +}