Skip to content

Commit

Permalink
output plugins config based on ucfg
Browse files Browse the repository at this point in the history
- update output plugins to use ucfg.Config
- removing MothershipConfig
- local config.go with default config for each output plugin
  • Loading branch information
urso committed Mar 2, 2016
1 parent 8c2effb commit 5ef56c0
Show file tree
Hide file tree
Showing 27 changed files with 476 additions and 418 deletions.
4 changes: 2 additions & 2 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"sync"

"github.com/satori/go.uuid"
"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/service"
)
Expand Down Expand Up @@ -81,7 +81,7 @@ const (

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Output map[string]outputs.MothershipConfig
Output map[string]*ucfg.Config
Logging logp.Logging
Shipper publisher.ShipperConfig
}
Expand Down
11 changes: 11 additions & 0 deletions libbeat/outputs/console/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package console

type config struct {
Pretty bool `config:"pretty"`
}

var (
defaultConfig = config{
Pretty: false,
}
)
23 changes: 13 additions & 10 deletions libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"os"

"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand All @@ -15,20 +17,21 @@ func init() {

type plugin struct{}

func (p plugin) NewOutput(
config *outputs.MothershipConfig,
topologyExpire int,
) (outputs.Outputer, error) {
pretty := config.Pretty != nil && *config.Pretty
return newConsole(pretty), nil
type console struct {
config config
}

type console struct {
pretty bool
func (p plugin) NewOutput(config *ucfg.Config, _ int) (outputs.Outputer, error) {
c := &console{config: defaultConfig}
err := config.Unpack(&c.config)
if err != nil {
return nil, err
}
return c, nil
}

func newConsole(pretty bool) *console {
return &console{pretty}
return &console{config{pretty}}
}

func writeBuffer(buf []byte) error {
Expand All @@ -52,7 +55,7 @@ func (c *console) PublishEvent(
var jsonEvent []byte
var err error

if c.pretty {
if c.config.Pretty {
jsonEvent, err = json.MarshalIndent(event, "", " ")
} else {
jsonEvent, err = json.Marshal(event)
Expand Down
44 changes: 44 additions & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package elasticsearch

import "github.com/elastic/beats/libbeat/outputs"

type elasticsearchConfig struct {
Protocol string `config:"protocol"`
Path string `config:"path"`
Params map[string]string `config:"parameters"`
Username string `config:"username"`
Password string `config:"password"`
ProxyURL string `config:"proxy_url"`
Index string `config:"index"`
LoadBalance bool `config:"loadbalance"`
TLS *outputs.TLSConfig `config:"tls"`
MaxRetries int `config:"max_retries"`
Timeout int `config:"timeout"`
SaveTopology bool `config:"save_topology"`
Template Template `config:"template"`
}

type Template struct {
Name string `config:"name"`
Path string `config:"path"`
Overwrite bool `config:"overwrite"`
}

const (
defaultBulkSize = 50
)

var (
defaultConfig = elasticsearchConfig{
Protocol: "",
Path: "",
ProxyURL: "",
Params: nil,
Username: "",
Password: "",
Timeout: 90,
MaxRetries: 3,
TLS: nil,
LoadBalance: true,
}
)
71 changes: 30 additions & 41 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
package elasticsearch

import (
"bytes"
"crypto/tls"
"errors"
"io/ioutil"
"net/url"
"strings"
"time"

"bytes"
"io/ioutil"
"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
)

var debug = logp.MakeDebug("elasticsearch")
type elasticsearchOutputPlugin struct{}

type elasticsearchOutput struct {
index string
mode mode.ConnectionMode
topology
}

var (
debug = logp.MakeDebug("elasticsearch")
)

var (
// ErrNotConnected indicates failure due to client having no valid connection
Expand All @@ -29,71 +40,49 @@ var (
ErrResponseRead = errors.New("bulk item status parse failed.")
)

const (
defaultMaxRetries = 3

defaultBulkSize = 50

elasticsearchDefaultTimeout = 90 * time.Second
)

func init() {
outputs.RegisterOutputPlugin("elasticsearch", elasticsearchOutputPlugin{})
}

type elasticsearchOutputPlugin struct{}

type elasticsearchOutput struct {
index string
mode mode.ConnectionMode

topology
}

// NewOutput instantiates a new output plugin instance publishing to elasticsearch.
func (f elasticsearchOutputPlugin) NewOutput(
config *outputs.MothershipConfig,
cfg *ucfg.Config,
topologyExpire int,
) (outputs.Outputer, error) {

// configure bulk size in config in case it is not set
if config.BulkMaxSize == nil {
bulkSize := defaultBulkSize
config.BulkMaxSize = &bulkSize
if !cfg.HasField("bulk_max_size") {
cfg.SetInt("bulk_max_size", 0, defaultBulkSize)
}

output := &elasticsearchOutput{}
err := output.init(*config, topologyExpire)
err := output.init(cfg, topologyExpire)
if err != nil {
return nil, err
}
return output, nil
}

func (out *elasticsearchOutput) init(
config outputs.MothershipConfig,
cfg *ucfg.Config,
topologyExpire int,
) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return err
}

tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
}

clients, err := mode.MakeClients(config, makeClientFactory(tlsConfig, config))

clients, err := mode.MakeClients(cfg, makeClientFactory(tlsConfig, &config))
if err != nil {
return err
}

timeout := elasticsearchDefaultTimeout
if config.Timeout != 0 {
timeout = time.Duration(config.Timeout) * time.Second
}
timeout := time.Duration(config.Timeout) * time.Second

maxRetries := defaultMaxRetries
if config.MaxRetries != nil {
maxRetries = *config.MaxRetries
}
maxRetries := config.MaxRetries
maxAttempts := maxRetries + 1 // maximum number of send attempts (-1 = infinite)
if maxRetries < 0 {
maxAttempts = 0
Expand All @@ -103,7 +92,7 @@ func (out *elasticsearchOutput) init(
var maxWaitRetry = time.Duration(60) * time.Second

out.clients = clients
loadBalance := config.LoadBalance == nil || *config.LoadBalance
loadBalance := config.LoadBalance
m, err := mode.NewConnectionMode(clients, !loadBalance,
maxAttempts, waitRetry, timeout, maxWaitRetry)
if err != nil {
Expand Down Expand Up @@ -143,7 +132,7 @@ func (out *elasticsearchOutput) init(

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config outputs.Template, clients []mode.ProtocolClient) {
func loadTemplate(config Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {
Expand Down Expand Up @@ -183,7 +172,7 @@ func loadTemplate(config outputs.Template, clients []mode.ProtocolClient) {

func makeClientFactory(
tls *tls.Config,
config outputs.MothershipConfig,
config *elasticsearchConfig,
) func(string) (mode.ProtocolClient, error) {
return func(host string) (mode.ProtocolClient, error) {
esURL, err := getURL(config.Protocol, config.Path, host)
Expand Down
29 changes: 16 additions & 13 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand All @@ -23,20 +25,21 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc
logp.Err("Invalid port. Cannot be converted to in: %s", GetEsPort())
}

var output elasticsearchOutput
output.init(outputs.MothershipConfig{
SaveTopology: true,
Host: GetEsHost(),
Port: esPort,
Username: os.Getenv("ES_USER"),
Password: os.Getenv("ES_PASS"),
Path: "",
Index: index,
Protocol: "http",
FlushInterval: &flushInterval,
BulkMaxSize: &bulkSize,
}, 10)
config, _ := ucfg.NewFrom(map[string]interface{}{
"save_topology": true,
"hosts": []string{GetEsHost()},
"port": esPort,
"username": os.Getenv("ES_USER"),
"password": os.Getenv("ES_PASS"),
"path": "",
"index": index,
"protocol": "http",
"flush_interval": flushInterval,
"bulk_max_size": bulkSize,
})

var output elasticsearchOutput
output.init(config, 10)
return output
}

Expand Down
16 changes: 16 additions & 0 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package fileout

type config struct {
Index string `config:"index"`
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb int `config:"rotate_every_kb"`
NumberOfFiles int `config:"number_of_files"`
}

var (
defaultConfig = config{
NumberOfFiles: 7,
RotateEveryKb: 10 * 1024,
}
)
Loading

0 comments on commit 5ef56c0

Please sign in to comment.