Skip to content

Commit

Permalink
Outputers become Processors
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed Oct 19, 2016
1 parent 744dff5 commit 963eb63
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 100 deletions.
11 changes: 0 additions & 11 deletions builtin/bins/output-log/log_output.go

This file was deleted.

11 changes: 11 additions & 0 deletions builtin/bins/processor-log/log_output.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package main

import (
"github.com/victorcoder/dkron/dkron"
)

type LogOutput struct{}

func (l *LogOutput) Process(execution *dkron.Execution) *dkron.Execution {
return execution
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (

func main() {
plugin.Serve(&plugin.ServeOpts{
Outputter: new(SyslogOutput),
Processor: new(SyslogOutput),
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (

type SyslogOutput struct{}

func (l *SyslogOutput) Output(execution *dkron.Execution) []byte {
func (l *SyslogOutput) Output(execution *dkron.Execution) *dkron.Execution {
logwriter, err := syslog.New(syslog.LOG_INFO, "dkron")
if err == nil {
log.SetOutput(logwriter)
}

log.Print(execution.Output)
return []byte("Output in syslog")
execution.Output = "Output in syslog"
return execution
}
14 changes: 7 additions & 7 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ var (
ErrLeaderNotFound = errors.New("No member leader found in member list")
)

// OutputterFactory is a function type that creates a new instance
// of a outputer.
type OutputterFactory func() (Outputter, error)
// ProcessorFactory is a function type that creates a new instance
// of a processor.
type ProcessorFactory func() (ExecutionProcessor, error)

// AgentCommand run server
type AgentCommand struct {
Ui cli.Ui
Version string
ShutdownCh <-chan struct{}
OutputPlugins map[string]Outputter
Ui cli.Ui
Version string
ShutdownCh <-chan struct{}
ProcessorPlugins map[string]ExecutionProcessor

serf *serf.Serf
config *Config
Expand Down
5 changes: 5 additions & 0 deletions dkron/execution_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package dkron

type ExecutionProcessor interface {
Process(execution *Execution) *Execution
}
6 changes: 1 addition & 5 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Job struct {
lock store.Locker

// Output types
Outputs []string `json:"outputs"`
Processors []string `json:"processors"`
}

// Run the job
Expand Down Expand Up @@ -207,7 +207,3 @@ func (j *Job) Unlock() error {

return nil
}

type Executor interface {
Invoke() string
}
5 changes: 0 additions & 5 deletions dkron/outputter.go

This file was deleted.

7 changes: 4 additions & 3 deletions dkron/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ func (rpcs *RPCServer) ExecutionDone(execution Execution, reply *serf.NodeRespon
}

// Get the defined output types for the job, and call them
for _, output := range job.Outputs {
output := rpcs.agent.OutputPlugins[output]
execution.Output = output.Output(&execution)
for _, p := range job.Processors {
processor := rpcs.agent.ProcessorPlugins[p]
e := processor.Process(&execution)
execution = *e
}

// Save the execution to store
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ services:
environment:
- GODEBUG=netdns=go
# Uncomment to use consul
command: bash -c "go build -o dkron-output-log ./builtin/bins/output-log && go build -o dkron-output-syslog ./builtin/bins/output-syslog && go build *.go && ./main agent -server -backend=consul -backend-machine=consul:8500 -join=dkron:8946 -log-level=debug"
command: bash -c "go build -o dkron-processor-log ./builtin/bins/processor-log && go build -o dkron-processor-syslog ./builtin/bins/processor-syslog && go build *.go && ./main agent -server -backend=consul -backend-machine=consul:8500 -join=dkron:8946 -log-level=debug"
# Uncomment to use etcd
# command: bash -c "go build *.go && ./main agent -server -backend=etcd -backend-machine=etcd:4001 -join=dkron:8946 -log-level=debug"
# Uncomment to use zk
Expand Down
51 changes: 51 additions & 0 deletions plugin/execution_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package plugin

import (
"net/rpc"

"github.com/hashicorp/go-plugin"
"github.com/victorcoder/dkron/dkron"
)

type ExecutionProcessorPlugin struct {
Processor dkron.ExecutionProcessor
}

func (p *ExecutionProcessorPlugin) Server(b *plugin.MuxBroker) (interface{}, error) {
return &ExecutionProcessorServer{Broker: b, Impl: p.Processor}, nil
}

func (p *ExecutionProcessorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {
return &Outputter{Broker: b, Client: c}, nil
}

// Here is an implementation that talks over RPC
type ExecutionProcessor struct {
Broker *plugin.MuxBroker
Client *rpc.Client
}

func (e *ExecutionProcessor) Process(execution *dkron.Execution) *dkron.Execution {
var resp dkron.Execution
err := e.Client.Call("Plugin.Process", execution, &resp)
if err != nil {
// You usually want your interfaces to return errors. If they don't,
// there isn't much other choice here.
panic(err)
}

return resp
}

// Here is the RPC server that Outputter talks to, conforming to
// the requirements of net/rpc
type ExecutionProcessorServer struct {
// This is the real implementation
Broker *plugin.MuxBroker
Processor dkron.ExecutionProcessor
}

func (e *ExecutionProcessorServer) Process(execution *dkron.Execution, resp *dkron.ExecutionProcessor) error {
*resp = s.Processor.Process(execution)
return nil
}
51 changes: 0 additions & 51 deletions plugin/outputter.go

This file was deleted.

2 changes: 1 addition & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (

// PluginMap should be used by clients for the map of plugins.
var PluginMap = map[string]plugin.Plugin{
"outputter": &OutputPlugin{},
"processor": &ExecutionProcessorPlugin{},
}
6 changes: 3 additions & 3 deletions plugin/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// The constants below are the names of the plugins that can be dispensed
// from the plugin server.
const (
OutputterPluginName = "outputter"
ProcessorPluginName = "processor"
)

// Handshake is the HandshakeConfig used to configure clients and servers.
Expand All @@ -20,7 +20,7 @@ var Handshake = plugin.HandshakeConfig{

// ServeOpts are the configurations to serve a plugin.
type ServeOpts struct {
Outputter dkron.Outputter
Processor dkron.ExecutionProcessor
}

// Serve serves a plugin. This function never returns and should be the final
Expand All @@ -36,6 +36,6 @@ func Serve(opts *ServeOpts) {
// server or client.
func pluginMap(opts *ServeOpts) map[string]plugin.Plugin {
return map[string]plugin.Plugin{
"outputter": &OutputPlugin{Outputter: opts.Outputter},
"processor": &ExecutionProviderPlugin{Processor: opts.Processor},
}
}
20 changes: 10 additions & 10 deletions plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Plugins struct {
Outputs map[string]dkron.Outputter
Processors map[string]dkron.ExecutionProcessor
}

// Discover plugins located on disk
Expand All @@ -25,10 +25,10 @@ type Plugins struct {
//
// Whichever file is discoverd LAST wins.
func (p *Plugins) DiscoverPlugins() error {
p.Outputs = make(map[string]dkron.Outputter)
p.Processors = make(map[string]dkron.ExecutionProcessor)

// Look in /etc/dkron/plugins
outputs, err := plugin.Discover("dkron-output-*", filepath.Join("/etc", "dkron", "plugins"))
processors, err := plugin.Discover("dkron-processor-*", filepath.Join("/etc", "dkron", "plugins"))
if err != nil {
return err
}
Expand All @@ -39,13 +39,13 @@ func (p *Plugins) DiscoverPlugins() error {
if err != nil {
logrus.WithError(err).Error("Error loading exe directory")
} else {
outputs, err = plugin.Discover("dkron-output-*", filepath.Dir(exePath))
processors, err = plugin.Discover("dkron-processor-*", filepath.Dir(exePath))
if err != nil {
return err
}
}

for _, file := range outputs {
for _, file := range processors {
// If the filename has a ".", trim up to there
// if idx := strings.Index(file, "."); idx >= 0 {
// file = file[:idx]
Expand All @@ -57,14 +57,14 @@ func (p *Plugins) DiscoverPlugins() error {
continue
}

outputter, _ := p.outputterFactory(file)
p.Outputs[parts[2]] = outputter
processor, _ := p.processorFactory(file)
p.Processors[parts[2]] = processor
}

return nil
}

func (p *Plugins) outputterFactory(path string) (dkron.Outputter, error) {
func (p *Plugins) processorFactory(path string) (dkron.ExecutionProcessor, error) {
// Build the plugin client configuration and init the plugin
var config plugin.ClientConfig
config.Cmd = exec.Command(path)
Expand All @@ -80,10 +80,10 @@ func (p *Plugins) outputterFactory(path string) (dkron.Outputter, error) {
return nil, err
}

raw, err := rpcClient.Dispense(dkplugin.OutputterPluginName)
raw, err := rpcClient.Dispense(dkplugin.ProcessorPluginName)
if err != nil {
return nil, err
}

return raw.(dkron.Outputter), nil
return raw.(dkron.ExecutionProcessor), nil
}

0 comments on commit 963eb63

Please sign in to comment.