Skip to content

Commit

Permalink
Remove topology map
Browse files Browse the repository at this point in the history
This was deprecated in 5.0 and is removed for 6.0.
  • Loading branch information
ruflin committed Mar 21, 2017
1 parent cd5d8c1 commit 2c1e9dc
Show file tree
Hide file tree
Showing 24 changed files with 38 additions and 509 deletions.
21 changes: 0 additions & 21 deletions libbeat/docs/generalconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,6 @@ fields:
region: us-east-1
------------------------------------------------------------------------------

===== refresh_topology_freq

deprecated[5.0.0]

The refresh interval of the topology map in
seconds. In other words, this setting specifies how often each Beat publishes its
IP addresses to the topology map. The default is 10 seconds.

===== topology_expire

deprecated[5.0.0]

The expiration time for the topology in seconds. This is useful in case a Beat
stops publishing its IP addresses. The IP addresses are removed automatically
from the topology map after expiration.

This setting is used only by the Redis output. The other outputs don't support
expiring entries.

The default is 15 seconds.

===== queue_size

The internal queue size for single events in the processing pipeline. The default
Expand Down
16 changes: 3 additions & 13 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,6 @@ The number of seconds to wait for new events between two bulk API index requests
If `bulk_max_size` is reached before this interval expires, additional bulk index
requests are made.

[[save_topology]]
===== save_topology

deprecated[5.0.0]

A Boolean that specifies whether the topology is kept in Elasticsearch. The default is
false.

This option is relevant for Packetbeat only.

===== ssl

Configuration options for SSL parameters like the certificate authority to use
Expand Down Expand Up @@ -430,11 +420,11 @@ The default value is true.
[[hosts]]
===== hosts

The list of known Logstash servers to connect to. If load balancing is disabled, but
multiple hosts are configured, one host is selected randomly (there is no precedence).
The list of known Logstash servers to connect to. If load balancing is disabled, but
multiple hosts are configured, one host is selected randomly (there is no precedence).
If one host becomes unreachable, another one is selected randomly.

All entries in this list can contain a port number. If no port number is given, the
All entries in this list can contain a port number. If no port number is given, the
value specified for <<port>> is used as the default port number.

===== compression_level
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type console struct {
codec outputs.Codec
}

func New(_ common.BeatInfo, config *common.Config, _ int) (outputs.Outputer, error) {
func New(_ common.BeatInfo, config *common.Config) (outputs.Outputer, error) {
var unpackedConfig Config
err := config.Unpack(&unpackedConfig)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestOutputLoadTemplate(t *testing.T) {
t.Fatal(err)
}

output, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0)
output, err := New(common.BeatInfo{Beat: "libbeat"}, cfg)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -422,7 +422,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.BulkOutputer, *Client
t.Fatal(err)
}

output, err := New(common.BeatInfo{Beat: "libbeat"}, config, 0)
output, err := New(common.BeatInfo{Beat: "libbeat"}, config)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type elasticsearchConfig struct {
TLS *outputs.TLSConfig `config:"ssl"`
MaxRetries int `config:"max_retries"`
Timeout time.Duration `config:"timeout"`
SaveTopology bool `config:"save_topology"`
Template Template `config:"template"`
}

Expand Down
5 changes: 2 additions & 3 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
)

// New instantiates a new output plugin instance publishing to elasticsearch.
func New(beat common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
func New(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) {
if !cfg.HasField("bulk_max_size") {
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}
Expand All @@ -65,7 +65,7 @@ func New(beat common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.
}

output := &elasticsearchOutput{beat: beat}
err := output.init(cfg, topologyExpire)
err := output.init(cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,7 +159,6 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) {

func (out *elasticsearchOutput) init(
cfg *common.Config,
topologyExpire int,
) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear
}

config, _ := common.NewConfigFrom(map[string]interface{}{
"save_topology": true,
"hosts": []string{GetEsHost()},
"port": esPort,
"username": os.Getenv("ES_USER"),
Expand All @@ -40,7 +39,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear
})

output := &elasticsearchOutput{beat: common.BeatInfo{Beat: "test"}}
output.init(config, 10)
output.init(config)
return output
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type fileOutput struct {
}

// New instantiates a new file output instance.
func New(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) {
func New(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var (
)

// New instantiates a new kafka output instance.
func New(_ common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
func New(_ common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) {
output := &kafka{}
err := output.init(cfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestKafkaPublish(t *testing.T) {
// create output within function scope to guarantee
// output is properly closed between single tests
func() {
tmp, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0)
tmp, err := New(common.BeatInfo{Beat: "libbeat"}, cfg)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func init() {
outputs.RegisterOutputPlugin("logstash", new)
}

func new(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) {
func new(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) {

if !cfg.HasField("index") {
cfg.SetString("index", -1, beat.Beat)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer {
"template.enabled": false,
})

output, err := plugin(common.BeatInfo{Beat: "libbeat"}, config, 10)
output, err := plugin(common.BeatInfo{Beat: "libbeat"}, config)
if err != nil {
t.Fatalf("init elasticsearch output plugin failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newTestLumberjackOutput(
}

cfg, _ := common.NewConfigFrom(config)
output, err := plugin(common.BeatInfo{}, cfg, 0)
output, err := plugin(common.BeatInfo{}, cfg)
if err != nil {
t.Fatalf("init logstash output plugin failed: %v", err)
}
Expand Down
14 changes: 2 additions & 12 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ type Outputer interface {
Close() error
}

type TopologyOutputer interface {
// Register the agent name and its IPs to the topology map
PublishIPs(name string, localAddrs []string) error

// Get the agent name with a specific IP from the topology map
GetNameByIP(ip string) string
}

// BulkOutputer adds BulkPublish to publish batches of events without looping.
// Outputers still might loop on events or use more efficient bulk-apis if present.
type BulkOutputer interface {
Expand All @@ -53,12 +45,11 @@ type BulkOutputer interface {
}

// Create and initialize the output plugin
type OutputBuilder func(beat common.BeatInfo, config *common.Config, topologyExpire int) (Outputer, error)
type OutputBuilder func(beat common.BeatInfo, config *common.Config) (Outputer, error)

// Functions to be exported by a output plugin
type OutputInterface interface {
Outputer
TopologyOutputer
}

type OutputPlugin struct {
Expand Down Expand Up @@ -92,7 +83,6 @@ func FindOutputPlugin(name string) OutputBuilder {
func InitOutputs(
beat common.BeatInfo,
configs map[string]*common.Config,
topologyExpire int,
) ([]OutputPlugin, error) {
var plugins []OutputPlugin
for name, plugin := range outputsPlugins {
Expand All @@ -106,7 +96,7 @@ func InitOutputs(
continue
}

output, err := plugin(beat, config, topologyExpire)
output, err := plugin(beat, config)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
return nil, err
Expand Down
15 changes: 3 additions & 12 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

type redisOut struct {
mode mode.ConnectionMode
topology
beat common.BeatInfo
}

Expand All @@ -40,15 +39,15 @@ func init() {
outputs.RegisterOutputPlugin("redis", new)
}

func new(beat common.BeatInfo, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
func new(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) {
r := &redisOut{beat: beat}
if err := r.init(cfg, expireTopo); err != nil {
if err := r.init(cfg); err != nil {
return nil, err
}
return r, nil
}

func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
func (r *redisOut) init(cfg *common.Config) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return err
Expand Down Expand Up @@ -112,14 +111,6 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
},
}

// configure topology support
r.topology.init(transp, topoConfig{
host: config.HostTopology,
password: config.PasswordTopology,
db: config.DbTopology,
expire: time.Duration(expireTopo) * time.Second,
})

// configure publisher clients
clients, err := modeutil.MakeClients(cfg, func(host string) (mode.ProtocolClient, error) {

Expand Down
Loading

0 comments on commit 2c1e9dc

Please sign in to comment.