diff --git a/backends/config.go b/backends/config.go index 2e966b65..253bcb6b 100644 --- a/backends/config.go +++ b/backends/config.go @@ -14,6 +14,7 @@ import ( "github.com/HeavyHorst/remco/backends/etcd" "github.com/HeavyHorst/remco/backends/file" "github.com/HeavyHorst/remco/backends/mock" + "github.com/HeavyHorst/remco/backends/plugin" "github.com/HeavyHorst/remco/backends/redis" "github.com/HeavyHorst/remco/backends/vault" "github.com/HeavyHorst/remco/backends/zookeeper" @@ -29,6 +30,7 @@ type Config struct { Redis *redis.Config Zookeeper *zookeeper.Config Mock *mock.Config + Plugin []plugin.Plugin } func (c *Config) GetBackends() []template.BackendConfig { diff --git a/backends/plugin/plugin.go b/backends/plugin/plugin.go new file mode 100644 index 00000000..02e7261f --- /dev/null +++ b/backends/plugin/plugin.go @@ -0,0 +1,89 @@ +/* + * This file is part of remco. + * © 2016 The Remco Authors + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +package plugin + +import ( + "context" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + + "github.com/HeavyHorst/easyKV" + berr "github.com/HeavyHorst/remco/backends/error" + "github.com/HeavyHorst/remco/log" + "github.com/HeavyHorst/remco/template" + "github.com/Sirupsen/logrus" + "github.com/natefinch/pie" +) + +type plug struct { + client *rpc.Client +} + +// Plugin represents the config for a plugin. +type Plugin struct { + // the path to the plugin executable + Path string + Config map[string]interface{} + template.Backend +} + +// Connect creates the connection to the plugin and sends the config map to the same. +func (p *Plugin) Connect() (template.Backend, error) { + if p == nil { + return template.Backend{}, berr.ErrNilConfig + } + + p.Backend.Name = path.Base(p.Path) + + client, err := pie.StartProviderCodec(jsonrpc.NewClientCodec, os.Stderr, p.Path) + if err != nil { + return p.Backend, err + } + + if p.Backend.Watch { + log.WithFields(logrus.Fields{ + "backend": p.Backend.Name, + }).Warn("Watch is not supported, using interval instead") + p.Backend.Watch = false + } + + plugin := &plug{client} + if err := plugin.Init(p.Config); err != nil { + return p.Backend, err + } + + p.Backend.ReadWatcher = plugin + return p.Backend, nil +} + +// Init sends the config map to the plugin +// the plugin can then run some initialization tasks +func (p *plug) Init(config map[string]interface{}) error { + var result bool + return p.client.Call("Plugin.Init", config, &result) +} + +// GetValues queries the plugin for keys +func (p *plug) GetValues(keys []string) (result map[string]string, err error) { + err = p.client.Call("Plugin.GetValues", keys, &result) + return result, err +} + +// Close closes the client connection +func (p *plug) Close() { + p.client.Call("Plugin.Close", nil, nil) + p.client.Close() +} + +// WatchPrefix is not supported for now +func (p *plug) WatchPrefix(prefix string, ctx context.Context, opts ...easyKV.WatchOption) (uint64, error) { + return 0, easyKV.ErrWatchNotSupported +} diff --git a/config/config.go b/config/config.go index ae1d356b..60f943a7 100644 --- a/config/config.go +++ b/config/config.go @@ -140,10 +140,15 @@ func (c *Configuration) configureLogger() { func (r *Resource) Init(ctx context.Context, reapLock *sync.RWMutex) (*template.Resource, error) { var backendList []template.Backend + backendConfigs := r.Backend.GetBackends() + + for _, v := range r.Backend.Plugin { + backendConfigs = append(backendConfigs, &v) + } // try to connect to all backends // connection to all backends must succeed to continue - for _, config := range r.Backend.GetBackends() { + for _, config := range backendConfigs { retryloop: for { select { diff --git a/vendor/github.com/natefinch/pie/LICENSE b/vendor/github.com/natefinch/pie/LICENSE new file mode 100644 index 00000000..cc38d640 --- /dev/null +++ b/vendor/github.com/natefinch/pie/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Nate Finch + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/natefinch/pie/README.md b/vendor/github.com/natefinch/pie/README.md new file mode 100644 index 00000000..79698373 --- /dev/null +++ b/vendor/github.com/natefinch/pie/README.md @@ -0,0 +1,183 @@ +# pie [![GoDoc](https://godoc.org/github.com/natefinch/pie?status.svg)](https://godoc.org/github.com/natefinch/pie) [![Build Status](https://drone.io/github.com/natefinch/pie/status.png)](https://drone.io/github.com/natefinch/pie/latest) [![Join the chat at https://gitter.im/natefinch/pie](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/natefinch/pie?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) + + import "github.com/natefinch/pie" + +package pie provides a toolkit for creating plugins for Go applications. + +![pie](https://cloud.githubusercontent.com/assets/3185864/7804562/bc35d256-0332-11e5-8562-fe00ec4d10b2.png) + +**Why is it called pie?** + +Because if you pronounce API like "a pie", then all this consuming and serving +of APIs becomes a lot more palatable. Also, pies are the ultimate pluggable +interface - depending on what's inside, you can get dinner, dessert, a snack, or +even breakfast. Plus, then I get to say that plugins in Go are as easy as... +well, you know. + +If you have to explain it to your boss, just say it's an acronym for Plug In +Executables. (but it's not, really) + +## About Pie + +Plugins using this toolkit and the applications managing those plugins +communicate via RPC over the plugin application's Stdin and Stdout. + +Functions in this package with the prefix `New` are intended to be used by the +plugin to set up its end of the communication. Functions in this package +with the prefix `Start` are intended to be used by the main application to set +up its end of the communication and start a plugin executable. + + + +This package provides two conceptually different types of plugins, based on +which side of the communication is the server and which is the client. +Plugins which provide an API server for the main application to call are +called Providers. Plugins which consume an API provided by the main +application are called Consumers. + +The default codec for RPC for this package is Go's gob encoding, however you +may provide your own codec, such as JSON-RPC provided by net/rpc/jsonrpc. + +There is no requirement that plugins for applications using this toolkit be +written in Go. As long as the plugin application can consume or provide an +RPC API of the correct codec, it can interoperate with main applications +using this process. For example, if your main application uses JSON-RPC, +many languages are capable of producing an executable that can provide a +JSON-RPC API for your application to use. + +Included in this repo are some simple examples of a master process and a +plugin process, to see how the library can be used. An example of the +standard plugin that provides an API the master process consumes is in the +examples/provider directory. master\_provider expects plugin\_provider to be +in the same directory or in your $PATH. You can just go install both of +them, and it'll work correctly. + +In addition to a regular plugin that provides an API, this package can be +used for plugins that consume an API provided by the main process. To see an +example of this, look in the examples/consumer folder. + + +## func NewConsumer +``` go +func NewConsumer() *rpc.Client +``` +NewConsumer returns an rpc.Client that will consume an API from the host +process over this application's Stdin and Stdout using gob encoding. + + +## func NewConsumerCodec +``` go +func NewConsumerCodec(f func(io.ReadWriteCloser) rpc.ClientCodec) *rpc.Client +``` +NewConsumerCodec returns an rpc.Client that will consume an API from the host +process over this application's Stdin and Stdout using the ClientCodec +returned by f. + + +## func StartProvider +``` go +func StartProvider(output io.Writer, path string, args ...string) (*rpc.Client, error) +``` +StartProvider start a provider-style plugin application at the given path and +args, and returns an RPC client that communicates with the plugin using gob +encoding over the plugin's Stdin and Stdout. The writer passed to output +will receive output from the plugin's stderr. Closing the RPC client +returned from this function will shut down the plugin application. + + +## func StartProviderCodec +``` go +func StartProviderCodec( + f func(io.ReadWriteCloser) rpc.ClientCodec, + output io.Writer, + path string, + args ...string, +) (*rpc.Client, error) +``` +StartProviderCodec starts a provider-style plugin application at the given +path and args, and returns an RPC client that communicates with the plugin +using the ClientCodec returned by f over the plugin's Stdin and Stdout. The +writer passed to output will receive output from the plugin's stderr. +Closing the RPC client returned from this function will shut down the plugin +application. + + +## type Server +``` go +type Server struct { + // contains filtered or unexported fields +} +``` +Server is a type that represents an RPC server that serves an API over +stdin/stdout. + + +### func NewProvider +``` go +func NewProvider() Server +``` +NewProvider returns a Server that will serve RPC over this +application's Stdin and Stdout. This method is intended to be run by the +plugin application. + + +### func StartConsumer +``` go +func StartConsumer(output io.Writer, path string, args ...string) (Server, error) +``` +StartConsumer starts a consumer-style plugin application with the given path +and args, writing its stderr to output. The plugin consumes an API this +application provides. The function returns the Server for this host +application, which should be used to register APIs for the plugin to consume. + + +### func (Server) Close +``` go +func (s Server) Close() error +``` +Close closes the connection with the client. If the client is a plugin +process, the process will be stopped. Further communication using this +Server will fail. + + +### func (Server) Register +``` go +func (s Server) Register(rcvr interface{}) error +``` +Register publishes in the provider the set of methods of the receiver value +that satisfy the following conditions: + + + - exported method + - two arguments, both of exported type + - the second argument is a pointer + - one return value, of type error + +It returns an error if the receiver is not an exported type or has no +suitable methods. It also logs the error using package log. The client +accesses each method using a string of the form "Type.Method", where Type is +the receiver's concrete type. + + +### func (Server) RegisterName +``` go +func (s Server) RegisterName(name string, rcvr interface{}) error +``` +RegisterName is like Register but uses the provided name for the type +instead of the receiver's concrete type. + + +### func (Server) Serve +``` go +func (s Server) Serve() +``` +Serve starts the Server's RPC server, serving via gob encoding. This call +will block until the client hangs up. + + +### func (Server) ServeCodec +``` go +func (s Server) ServeCodec(f func(io.ReadWriteCloser) rpc.ServerCodec) +``` +ServeCodec starts the Server's RPC server, serving via the encoding returned +by f. This call will block until the client hangs up. diff --git a/vendor/github.com/natefinch/pie/doc.go b/vendor/github.com/natefinch/pie/doc.go new file mode 100644 index 00000000..436851c6 --- /dev/null +++ b/vendor/github.com/natefinch/pie/doc.go @@ -0,0 +1,37 @@ +// Package pie provides a toolkit for creating plugins for Go applications. +// +// Plugins using this toolkit and the applications managing those plugins +// communicate via RPC over the plugin application's Stdin and Stdout. +// +// Functions in this package with the prefix New are intended to be used by the +// plugin to set up its end of the communication. Functions in this package +// with the prefix Start are intended to be used by the main application to set +// up its end of the communication and run a plugin executable. +// +// This package provides two conceptually different types of plugins, based on +// which side of the communication is the server and which is the client. +// Plugins which provide an API server for the main application to call are +// called Providers. Plugins which consume an API provided by the main +// application are called Consumers. +// +// The default codec for RPC for this package is Go's gob encoding, however you +// may provide your own codec, such as JSON-RPC provided by net/rpc/jsonrpc. +// +// There is no requirement that plugins for applications using this toolkit be +// written in Go. As long as the plugin application can consume or provide an +// RPC API of the correct codec, it can interoperate with main applications +// using this process. For example, if your main application uses JSON-RPC, +// many languages are capable of producing an executable that can provide a +// JSON-RPC API for your application to use. +// +// Included in this repo are some simple examples of a master process and a +// plugin process, to see how the library can be used. An example of the +// standard plugin that provides an API the master process consumes is in the +// exmaples/provider directory. master_provider expects plugin_provider to be +// in the same directory or in your $PATH. You can just go install both of +// them, and it'll work correctly. + +// In addition to a regular plugin that provides an API, this package can be +// used for plugins that consume an API provided by the main process. To see an +// example of this, look in the examples/consumer folder. +package pie diff --git a/vendor/github.com/natefinch/pie/pie.go b/vendor/github.com/natefinch/pie/pie.go new file mode 100644 index 00000000..465b35b9 --- /dev/null +++ b/vendor/github.com/natefinch/pie/pie.go @@ -0,0 +1,260 @@ +package pie + +import ( + "errors" + "fmt" + "io" + "net/rpc" + "os" + "os/exec" + "time" +) + +var errProcStopTimeout = errors.New("process killed after timeout waiting for process to stop") + +// NewProvider returns a Server that will serve RPC over this +// application's Stdin and Stdout. This method is intended to be run by the +// plugin application. +func NewProvider() Server { + return Server{ + server: rpc.NewServer(), + rwc: rwCloser{os.Stdin, os.Stdout}, + } +} + +// Server is a type that represents an RPC server that serves an API over +// stdin/stdout. +type Server struct { + server *rpc.Server + rwc io.ReadWriteCloser + codec rpc.ServerCodec +} + +// Close closes the connection with the client. If the client is a plugin +// process, the process will be stopped. Further communication using this +// Server will fail. +func (s Server) Close() error { + if s.codec != nil { + return s.codec.Close() + } + return s.rwc.Close() +} + +// Serve starts the Server's RPC server, serving via gob encoding. This call +// will block until the client hangs up. +func (s Server) Serve() { + s.server.ServeConn(s.rwc) +} + +// ServeCodec starts the Server's RPC server, serving via the encoding returned +// by f. This call will block until the client hangs up. +func (s Server) ServeCodec(f func(io.ReadWriteCloser) rpc.ServerCodec) { + s.server.ServeCodec(f(s.rwc)) +} + +// Register publishes in the provider the set of methods of the receiver value +// that satisfy the following conditions: +// +// - exported method +// - two arguments, both of exported type +// - the second argument is a pointer +// - one return value, of type error +// +// It returns an error if the receiver is not an exported type or has no +// suitable methods. It also logs the error using package log. The client +// accesses each method using a string of the form "Type.Method", where Type is +// the receiver's concrete type. +func (s Server) Register(rcvr interface{}) error { + return s.server.Register(rcvr) +} + +// RegisterName is like Register but uses the provided name for the type +// instead of the receiver's concrete type. +func (s Server) RegisterName(name string, rcvr interface{}) error { + return s.server.RegisterName(name, rcvr) +} + +// StartProvider start a provider-style plugin application at the given path and +// args, and returns an RPC client that communicates with the plugin using gob +// encoding over the plugin's Stdin and Stdout. The writer passed to output +// will receive output from the plugin's stderr. Closing the RPC client +// returned from this function will shut down the plugin application. +func StartProvider(output io.Writer, path string, args ...string) (*rpc.Client, error) { + pipe, err := start(makeCommand(output, path, args)) + if err != nil { + return nil, err + } + return rpc.NewClient(pipe), nil +} + +// StartProviderCodec starts a provider-style plugin application at the given +// path and args, and returns an RPC client that communicates with the plugin +// using the ClientCodec returned by f over the plugin's Stdin and Stdout. The +// writer passed to output will receive output from the plugin's stderr. +// Closing the RPC client returned from this function will shut down the plugin +// application. +func StartProviderCodec( + f func(io.ReadWriteCloser) rpc.ClientCodec, + output io.Writer, + path string, + args ...string, +) (*rpc.Client, error) { + pipe, err := start(makeCommand(output, path, args)) + if err != nil { + return nil, err + } + return rpc.NewClientWithCodec(f(pipe)), nil +} + +// StartConsumer starts a consumer-style plugin application with the given path +// and args, writing its stderr to output. The plugin consumes an API this +// application provides. The function returns the Server for this host +// application, which should be used to register APIs for the plugin to consume. +func StartConsumer(output io.Writer, path string, args ...string) (Server, error) { + pipe, err := start(makeCommand(output, path, args)) + if err != nil { + return Server{}, err + } + return Server{ + server: rpc.NewServer(), + rwc: pipe, + }, nil +} + +// NewConsumer returns an rpc.Client that will consume an API from the host +// process over this application's Stdin and Stdout using gob encoding. +func NewConsumer() *rpc.Client { + return rpc.NewClient(rwCloser{os.Stdin, os.Stdout}) +} + +// NewConsumerCodec returns an rpc.Client that will consume an API from the host +// process over this application's Stdin and Stdout using the ClientCodec +// returned by f. +func NewConsumerCodec(f func(io.ReadWriteCloser) rpc.ClientCodec) *rpc.Client { + return rpc.NewClientWithCodec(f(rwCloser{os.Stdin, os.Stdout})) +} + +// start runs the plugin and returns an ioPipe that can be used to control the +// plugin. +func start(cmd commander) (_ ioPipe, err error) { + in, err := cmd.StdinPipe() + if err != nil { + return ioPipe{}, err + } + defer func() { + if err != nil { + in.Close() + } + }() + out, err := cmd.StdoutPipe() + if err != nil { + return ioPipe{}, err + } + defer func() { + if err != nil { + out.Close() + } + }() + + proc, err := cmd.Start() + if err != nil { + return ioPipe{}, err + } + return ioPipe{out, in, proc}, nil +} + +// makeCommand is a function that just creates an exec.Cmd and the process in +// it. It exists to facilitate testing. +var makeCommand = func(w io.Writer, path string, args []string) commander { + cmd := exec.Command(path, args...) + cmd.Stderr = w + return execCmd{cmd} +} + +type execCmd struct { + *exec.Cmd +} + +func (e execCmd) Start() (osProcess, error) { + if err := e.Cmd.Start(); err != nil { + return nil, err + } + return e.Cmd.Process, nil +} + +// commander is an interface that is fulfilled by exec.Cmd and makes our testing +// a little easier. +type commander interface { + StdinPipe() (io.WriteCloser, error) + StdoutPipe() (io.ReadCloser, error) + // Start is like exec.Cmd's start, except it also returns the os.Process if + // start succeeds. + Start() (osProcess, error) +} + +// osProcess is an interface that is fullfilled by *os.Process and makes our +// testing a little easier. +type osProcess interface { + Wait() (*os.ProcessState, error) + Kill() error + Signal(os.Signal) error +} + +// ioPipe simply wraps a ReadCloser, WriteCloser, and a Process, and coordinates +// them so they all close together. +type ioPipe struct { + io.ReadCloser + io.WriteCloser + proc osProcess +} + +// Close closes the pipe's WriteCloser, ReadClosers, and process. +func (iop ioPipe) Close() error { + err := iop.ReadCloser.Close() + if writeErr := iop.WriteCloser.Close(); writeErr != nil { + err = writeErr + } + if procErr := iop.closeProc(); procErr != nil { + err = procErr + } + return err +} + +// procTimeout is the timeout to wait for a process to stop after being +// signalled. It is adjustable to keep tests fast. +var procTimeout = time.Second + +// closeProc sends an interrupt signal to the pipe's process, and if it doesn't +// respond in one second, kills the process. +func (iop ioPipe) closeProc() error { + result := make(chan error, 1) + go func() { _, err := iop.proc.Wait(); result <- err }() + if err := iop.proc.Signal(os.Interrupt); err != nil { + return err + } + select { + case err := <-result: + return err + case <-time.After(procTimeout): + if err := iop.proc.Kill(); err != nil { + return fmt.Errorf("error killing process after timeout: %s", err) + } + return errProcStopTimeout + } +} + +// rwCloser just merges a ReadCloser and a WriteCloser into a ReadWriteCloser. +type rwCloser struct { + io.ReadCloser + io.WriteCloser +} + +// Close closes both the ReadCloser and the WriteCloser, returning the last +// error from either. +func (rw rwCloser) Close() error { + err := rw.ReadCloser.Close() + if err := rw.WriteCloser.Close(); err != nil { + return err + } + return err +} diff --git a/vendor/vendor.json b/vendor/vendor.json index aa473335..6cebeddd 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -387,6 +387,12 @@ "revision": "7ac1091ca77699f898d17a0680e5ed22c9bcfa17", "revisionTime": "2016-10-03T16:37:55Z" }, + { + "checksumSHA1": "cObB/GNrnmBbziYIBnTrFLUViAA=", + "path": "github.com/natefinch/pie", + "revision": "13d3874dc4836d5db81d3a950aa5436b1eb23372", + "revisionTime": "2016-04-20T17:41:31Z" + }, { "checksumSHA1": "mhvIMH8oAtOiEyg37zWKmgb+6v4=", "path": "github.com/pborman/uuid",