diff --git a/libbeat/docs/generalconfig.asciidoc b/libbeat/docs/generalconfig.asciidoc index 08c406519f5..53aed7c3bca 100644 --- a/libbeat/docs/generalconfig.asciidoc +++ b/libbeat/docs/generalconfig.asciidoc @@ -89,27 +89,6 @@ fields: region: us-east-1 ------------------------------------------------------------------------------ -===== refresh_topology_freq - -deprecated[5.0.0] - -The refresh interval of the topology map in -seconds. In other words, this setting specifies how often each Beat publishes its -IP addresses to the topology map. The default is 10 seconds. - -===== topology_expire - -deprecated[5.0.0] - -The expiration time for the topology in seconds. This is useful in case a Beat -stops publishing its IP addresses. The IP addresses are removed automatically -from the topology map after expiration. - -This setting is used only by the Redis output. The other outputs don't support -expiring entries. - -The default is 15 seconds. - ===== queue_size The internal queue size for single events in the processing pipeline. The default diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 163716cf3f2..6b1f411f4ce 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -333,16 +333,6 @@ The number of seconds to wait for new events between two bulk API index requests If `bulk_max_size` is reached before this interval expires, additional bulk index requests are made. -[[save_topology]] -===== save_topology - -deprecated[5.0.0] - -A Boolean that specifies whether the topology is kept in Elasticsearch. The default is -false. - -This option is relevant for Packetbeat only. - ===== ssl Configuration options for SSL parameters like the certificate authority to use @@ -430,11 +420,11 @@ The default value is true. [[hosts]] ===== hosts -The list of known Logstash servers to connect to. If load balancing is disabled, but -multiple hosts are configured, one host is selected randomly (there is no precedence). +The list of known Logstash servers to connect to. If load balancing is disabled, but +multiple hosts are configured, one host is selected randomly (there is no precedence). If one host becomes unreachable, another one is selected randomly. -All entries in this list can contain a port number. If no port number is given, the +All entries in this list can contain a port number. If no port number is given, the value specified for <> is used as the default port number. ===== compression_level diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 748383f5bdf..f1b6b12daf8 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -21,7 +21,7 @@ type console struct { codec outputs.Codec } -func New(_ common.BeatInfo, config *common.Config, _ int) (outputs.Outputer, error) { +func New(_ common.BeatInfo, config *common.Config) (outputs.Outputer, error) { var unpackedConfig Config err := config.Unpack(&unpackedConfig) if err != nil { diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 426f633fa4c..426a8efb591 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -177,7 +177,7 @@ func TestOutputLoadTemplate(t *testing.T) { t.Fatal(err) } - output, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0) + output, err := New(common.BeatInfo{Beat: "libbeat"}, cfg) if err != nil { t.Fatal(err) } @@ -422,7 +422,7 @@ func connectTestEs(t *testing.T, cfg interface{}) (outputs.BulkOutputer, *Client t.Fatal(err) } - output, err := New(common.BeatInfo{Beat: "libbeat"}, config, 0) + output, err := New(common.BeatInfo{Beat: "libbeat"}, config) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index c30f68702db..8eaebc0cc51 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -19,7 +19,6 @@ type elasticsearchConfig struct { TLS *outputs.TLSConfig `config:"ssl"` MaxRetries int `config:"max_retries"` Timeout time.Duration `config:"timeout"` - SaveTopology bool `config:"save_topology"` Template Template `config:"template"` } diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 24d2c4559a0..27d06d6d8f4 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -54,7 +54,7 @@ var ( ) // New instantiates a new output plugin instance publishing to elasticsearch. -func New(beat common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) { +func New(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) { if !cfg.HasField("bulk_max_size") { cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } @@ -65,7 +65,7 @@ func New(beat common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs. } output := &elasticsearchOutput{beat: beat} - err := output.init(cfg, topologyExpire) + err := output.init(cfg) if err != nil { return nil, err } @@ -159,7 +159,6 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { func (out *elasticsearchOutput) init( cfg *common.Config, - topologyExpire int, ) error { config := defaultConfig if err := cfg.Unpack(&config); err != nil { diff --git a/libbeat/outputs/elasticsearch/output_test.go b/libbeat/outputs/elasticsearch/output_test.go index 374e6fba88b..5c774ffc9eb 100644 --- a/libbeat/outputs/elasticsearch/output_test.go +++ b/libbeat/outputs/elasticsearch/output_test.go @@ -26,7 +26,6 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear } config, _ := common.NewConfigFrom(map[string]interface{}{ - "save_topology": true, "hosts": []string{GetEsHost()}, "port": esPort, "username": os.Getenv("ES_USER"), @@ -40,7 +39,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) *elasticsear }) output := &elasticsearchOutput{beat: common.BeatInfo{Beat: "test"}} - output.init(config, 10) + output.init(config) return output } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index c09dc499b36..53c937c5c63 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -18,7 +18,7 @@ type fileOutput struct { } // New instantiates a new file output instance. -func New(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) { +func New(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 84815fa0244..5ff86324ceb 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -100,7 +100,7 @@ var ( ) // New instantiates a new kafka output instance. -func New(_ common.BeatInfo, cfg *common.Config, topologyExpire int) (outputs.Outputer, error) { +func New(_ common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) { output := &kafka{} err := output.init(cfg) if err != nil { diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 97d0a6669ed..3a9cbbc1114 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -196,7 +196,7 @@ func TestKafkaPublish(t *testing.T) { // create output within function scope to guarantee // output is properly closed between single tests func() { - tmp, err := New(common.BeatInfo{Beat: "libbeat"}, cfg, 0) + tmp, err := New(common.BeatInfo{Beat: "libbeat"}, cfg) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index e449d2c3c27..18d152d723d 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -46,7 +46,7 @@ func init() { outputs.RegisterOutputPlugin("logstash", new) } -func new(beat common.BeatInfo, cfg *common.Config, _ int) (outputs.Outputer, error) { +func new(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) { if !cfg.HasField("index") { cfg.SetString("index", -1, beat.Beat) diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 5a38f126e94..ef4786e49d2 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -160,7 +160,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { "template.enabled": false, }) - output, err := plugin(common.BeatInfo{Beat: "libbeat"}, config, 10) + output, err := plugin(common.BeatInfo{Beat: "libbeat"}, config) if err != nil { t.Fatalf("init elasticsearch output plugin failed: %v", err) } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 3608092b5be..488e52afe8e 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -74,7 +74,7 @@ func newTestLumberjackOutput( } cfg, _ := common.NewConfigFrom(config) - output, err := plugin(common.BeatInfo{}, cfg, 0) + output, err := plugin(common.BeatInfo{}, cfg) if err != nil { t.Fatalf("init logstash output plugin failed: %v", err) } diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index c5faf3b56a4..c9b2d48c036 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -37,14 +37,6 @@ type Outputer interface { Close() error } -type TopologyOutputer interface { - // Register the agent name and its IPs to the topology map - PublishIPs(name string, localAddrs []string) error - - // Get the agent name with a specific IP from the topology map - GetNameByIP(ip string) string -} - // BulkOutputer adds BulkPublish to publish batches of events without looping. // Outputers still might loop on events or use more efficient bulk-apis if present. type BulkOutputer interface { @@ -53,12 +45,11 @@ type BulkOutputer interface { } // Create and initialize the output plugin -type OutputBuilder func(beat common.BeatInfo, config *common.Config, topologyExpire int) (Outputer, error) +type OutputBuilder func(beat common.BeatInfo, config *common.Config) (Outputer, error) // Functions to be exported by a output plugin type OutputInterface interface { Outputer - TopologyOutputer } type OutputPlugin struct { @@ -92,7 +83,6 @@ func FindOutputPlugin(name string) OutputBuilder { func InitOutputs( beat common.BeatInfo, configs map[string]*common.Config, - topologyExpire int, ) ([]OutputPlugin, error) { var plugins []OutputPlugin for name, plugin := range outputsPlugins { @@ -106,7 +96,7 @@ func InitOutputs( continue } - output, err := plugin(beat, config, topologyExpire) + output, err := plugin(beat, config) if err != nil { logp.Err("failed to initialize %s plugin as output: %s", name, err) return nil, err diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 40a8789a8e9..3484112a38a 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -17,7 +17,6 @@ import ( type redisOut struct { mode mode.ConnectionMode - topology beat common.BeatInfo } @@ -40,15 +39,15 @@ func init() { outputs.RegisterOutputPlugin("redis", new) } -func new(beat common.BeatInfo, cfg *common.Config, expireTopo int) (outputs.Outputer, error) { +func new(beat common.BeatInfo, cfg *common.Config) (outputs.Outputer, error) { r := &redisOut{beat: beat} - if err := r.init(cfg, expireTopo); err != nil { + if err := r.init(cfg); err != nil { return nil, err } return r, nil } -func (r *redisOut) init(cfg *common.Config, expireTopo int) error { +func (r *redisOut) init(cfg *common.Config) error { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return err @@ -112,14 +111,6 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error { }, } - // configure topology support - r.topology.init(transp, topoConfig{ - host: config.HostTopology, - password: config.PasswordTopology, - db: config.DbTopology, - expire: time.Duration(expireTopo) * time.Second, - }) - // configure publisher clients clients, err := modeutil.MakeClients(cfg, func(host string) (mode.ProtocolClient, error) { diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index 7c7f0c7ef4b..3359643206d 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -29,98 +29,6 @@ const ( SRedisDefaultPort = "6380" ) -func TestTopologyInRedisTCP(t *testing.T) { - db := 1 - key := "test_topo_tcp" - redisHosts := []string{getRedisAddr()} - redisConfig := map[string]interface{}{ - "hosts": redisHosts, - "key": key, - "host_topology": redisHosts[0], - "db_topology": db, - "timeout": "5s", - } - - testTopologyInRedis(t, redisConfig) -} - -func TestTopologyInRedisTLS(t *testing.T) { - db := 1 - key := "test_topo_tls" - redisHosts := []string{getSRedisAddr()} - redisConfig := map[string]interface{}{ - "hosts": redisHosts, - "key": key, - "host_topology": redisHosts[0], - "db_topology": db, - "timeout": "5s", - - "ssl.verification_mode": "full", - "ssl.certificate_authorities": []string{ - "../../../testing/environments/docker/sredis/pki/tls/certs/sredis.crt", - }, - } - - testTopologyInRedis(t, redisConfig) -} - -func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) { - tests := []struct { - out *redisOut - name string - ips []string - }{ - {nil, "proxy1", []string{"10.1.0.4"}}, - {nil, "proxy2", []string{"10.1.0.9", "fe80::4e8d:79ff:fef2:de6a"}}, - {nil, "proxy3", []string{"10.1.0.10"}}, - } - - db := 0 - key := cfg["key"].(string) - if v, ok := cfg["db_topology"]; ok { - db = v.(int) - } - - // prepare redis - { - conn, err := redis.Dial("tcp", getRedisAddr(), redis.DialDatabase(db)) - if err != nil { - t.Fatalf("redis.Dial failed %v", err) - } - // delete old key if present - defer conn.Close() - conn.Do("DEL", key) - } - - // 1. connect - for i := range tests { - tests[i].out = newRedisTestingOutput(t, cfg) - defer tests[i].out.Close() - } - - // 2. publish ips twice (so all outputs have same topology map) - for i := 0; i < 2; i++ { - for _, test := range tests { - t.Logf("publish %v ips: %v", test.name, test.ips) - err := test.out.PublishIPs(test.name, test.ips) - assert.NoError(t, err) - } - } - - // 3. check names available - for _, test := range tests { - t.Logf("check %v knows ips", test.name) - for _, other := range tests { - t.Logf(" check ips of %v", other.name) - for _, ip := range other.ips { - name := test.out.GetNameByIP(ip) - t.Logf(" check ip: %v -> %v", ip, other.name == name) - assert.Equal(t, other.name, name) - } - } - } -} - func TestPublishListTCP(t *testing.T) { key := "test_publist_tcp" db := 0 @@ -336,9 +244,6 @@ func getSRedisAddr() string { } func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *redisOut { - params := struct { - Expire int `config:"topology_expire"` - }{15} config, err := common.NewConfigFrom(cfg) if err != nil { @@ -350,11 +255,7 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) *redisOut { t.Fatalf("redis output module not registered") } - if err := config.Unpack(¶ms); err != nil { - t.Fatalf("Failed to unpack topology_expire: %v", err) - } - - out, err := plugin(common.BeatInfo{Beat: "libbeat"}, config, params.Expire) + out, err := plugin(common.BeatInfo{Beat: "libbeat"}, config) if err != nil { t.Fatalf("Failed to initialize redis output: %v", err) } diff --git a/libbeat/outputs/redis/topology.go b/libbeat/outputs/redis/topology.go deleted file mode 100644 index 0733cee4dcb..00000000000 --- a/libbeat/outputs/redis/topology.go +++ /dev/null @@ -1,107 +0,0 @@ -package redis - -import ( - "strings" - "sync/atomic" - "time" - - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/outputs/transport" - "github.com/garyburd/redigo/redis" -) - -type topology struct { - transCfg *transport.Config - cfg topoConfig - topologyMap atomic.Value // Value holds a map[string]string -} - -type topoConfig struct { - host string - password string - db int - expire time.Duration -} - -func (t *topology) init(tc *transport.Config, cfg topoConfig) { - *t = topology{transCfg: tc, cfg: cfg} - if t.cfg.host != "" { - t.topologyMap.Store(map[string]string{}) - } -} - -func (t *topology) GetNameByIP(ip string) string { - if t.cfg.host == "" { - return "" - } - - if m, ok := t.topologyMap.Load().(map[string]string); ok { - if name, exists := m[ip]; exists { - return name - } - } - return "" -} - -func (t *topology) PublishIPs(name string, localAddrs []string) error { - if t.cfg.host == "" { - debugf("Not publishing IPs because, no host configured") - } - - dialOpts := []redis.DialOption{ - redis.DialPassword(t.cfg.password), - redis.DialDatabase(t.cfg.db), - } - if t.transCfg != nil { - d, err := transport.MakeDialer(t.transCfg) - if err != nil { - return err - } - dialOpts = append(dialOpts, redis.DialNetDial(d.Dial)) - } - - conn, err := redis.Dial("tcp", t.cfg.host, dialOpts...) - if err != nil { - return err - } - defer conn.Close() - - _, err = conn.Do("HSET", name, "ipaddrs", strings.Join(localAddrs, ",")) - if err != nil { - logp.Err("[%s] Fail to set the IP addresses: %s", name, err) - return err - } - - _, err = conn.Do("EXPIRE", name, int(t.cfg.expire.Seconds())) - if err != nil { - logp.Err("[%s] Fail to set the expiration time: %s", name, err) - return err - } - - t.updateMap(conn) - return nil -} - -func (t *topology) updateMap(conn redis.Conn) { - M := map[string]string{} - hostnames, err := redis.Strings(conn.Do("KEYS", "*")) - if err != nil { - logp.Err("Fail to get the all shippers from the topology map %s", err) - return - } - - for _, host := range hostnames { - res, err := redis.String(conn.Do("HGET", host, "ipaddrs")) - if err != nil { - logp.Err("[%s] Fail to get the IPs: %s", host, err) - continue - } - - for _, addr := range strings.Split(res, ",") { - M[addr] = host - } - } - - t.topologyMap.Store(M) - debugf("Topology %s", M) -} diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 06ae53b71b2..05443e35d1b 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -4,7 +4,6 @@ import ( "errors" "flag" "sync/atomic" - "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/op" @@ -54,22 +53,19 @@ type Publisher interface { } type BeatPublisher struct { - shipperName string // Shipper name as set in the configuration file - hostname string // Host name as returned by the operation system - name string // The shipperName if configured, the hostname otherwise - version string - IPAddrs []string - disabled bool - Index string - Output []*outputWorker - TopologyOutput outputs.TopologyOutputer - geoLite *libgeo.GeoIP - Processors *processors.Processors + shipperName string // Shipper name as set in the configuration file + hostname string // Host name as returned by the operation system + name string // The shipperName if configured, the hostname otherwise + version string + IPAddrs []string + disabled bool + Index string + Output []*outputWorker + geoLite *libgeo.GeoIP + Processors *processors.Processors globalEventMetadata common.EventMetadata // Fields and tags to add to each event. - RefreshTopologyTimer <-chan time.Time - // On shutdown the publisher is finished first and the outputers next, // so no publisher will attempt to send messages on closed channels. // Note: beat data producers must be shutdown before the publisher plugin @@ -89,8 +85,6 @@ type BeatPublisher struct { type ShipperConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to each event. Name string `config:"name"` - RefreshTopologyFreq time.Duration `config:"refresh_topology_freq"` - TopologyExpire int `config:"topology_expire"` Geoip common.Geoip `config:"geoip"` // internal publisher queue sizes @@ -99,11 +93,6 @@ type ShipperConfig struct { MaxProcs *int `config:"max_procs"` } -type Topology struct { - Name string `json:"name"` - IP string `json:"ip"` -} - const ( DefaultQueueSize = 1000 DefaultBulkQueueSize = 0 @@ -135,12 +124,6 @@ func (publisher *BeatPublisher) GetServerName(ip string) string { return publisher.name } - // find the shipper with the desired IP - if publisher.TopologyOutput != nil { - logp.Warn("Topology settings are deprecated.") - return publisher.TopologyOutput.GetNameByIP(ip) - } - return "" } @@ -153,36 +136,6 @@ func (publisher *BeatPublisher) Connect() Client { return newClient(publisher) } -func (publisher *BeatPublisher) UpdateTopologyPeriodically() { - for range publisher.RefreshTopologyTimer { - _ = publisher.PublishTopology() // ignore errors - } -} - -func (publisher *BeatPublisher) PublishTopology(params ...string) error { - - localAddrs := params - if len(params) == 0 { - addrs, err := common.LocalIPAddrsAsStrings(false) - if err != nil { - logp.Err("Getting local IP addresses fails with: %s", err) - return err - } - localAddrs = addrs - } - - if publisher.TopologyOutput != nil { - debug("Add topology entry for %s: %s", publisher.name, localAddrs) - - err := publisher.TopologyOutput.PublishIPs(publisher.name, localAddrs) - if err != nil { - return err - } - } - - return nil -} - // Create new PublisherType func New( beat common.BeatInfo, @@ -221,14 +174,13 @@ func (publisher *BeatPublisher) init( publisher.wsOutput.Init() if !publisher.disabled { - plugins, err := outputs.InitOutputs(beat, configs, shipper.TopologyExpire) + plugins, err := outputs.InitOutputs(beat, configs) if err != nil { return err } var outputers []*outputWorker - var topoOutput outputs.TopologyOutputer for _, plugin := range plugins { output := plugin.Output config := plugin.Config @@ -243,29 +195,9 @@ func (publisher *BeatPublisher) init( *shipper.QueueSize, *shipper.BulkQueueSize)) - if ok, _ := config.Bool("save_topology", 0); !ok { - continue - } - - topo, ok := output.(outputs.TopologyOutputer) - if !ok { - logp.Err("Output type %s does not support topology logging", - plugin.Name) - return errors.New("Topology output not supported") - } - - if topoOutput != nil { - logp.Err("Multiple outputs defined to store topology. " + - "Please add save_topology = true option only for one output.") - return errors.New("Multiple outputs defined to store topology") - } - - topoOutput = topo - logp.Info("Using %s to store the topology", plugin.Name) } publisher.Output = outputers - publisher.TopologyOutput = topoOutput } if !publisher.disabled { @@ -273,10 +205,6 @@ func (publisher *BeatPublisher) init( logp.Info("No outputs are defined. Please define one under the output section.") return errors.New("No outputs are defined. Please define one under the output section.") } - - if publisher.TopologyOutput == nil { - logp.Debug("publish", "No output is defined to store the topology. The server fields might not be filled.") - } } publisher.shipperName = shipper.Name @@ -301,25 +229,6 @@ func (publisher *BeatPublisher) init( return err } - if !publisher.disabled && publisher.TopologyOutput != nil { - RefreshTopologyFreq := 10 * time.Second - if shipper.RefreshTopologyFreq != 0 { - RefreshTopologyFreq = shipper.RefreshTopologyFreq - } - publisher.RefreshTopologyTimer = time.Tick(RefreshTopologyFreq) - logp.Info("Topology map refreshed every %s", RefreshTopologyFreq) - - // register shipper and its public IP addresses - err = publisher.PublishTopology() - if err != nil { - logp.Err("Failed to publish topology: %s", err) - return err - } - - // update topology periodically - go publisher.UpdateTopologyPeriodically() - } - publisher.pipelines.async = newAsyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize, &publisher.wsPublisher) publisher.pipelines.sync = newSyncPipeline(publisher, *shipper.QueueSize, *shipper.BulkQueueSize) return nil diff --git a/libbeat/publisher/publish_test.go b/libbeat/publisher/publish_test.go deleted file mode 100644 index 4dc66541437..00000000000 --- a/libbeat/publisher/publish_test.go +++ /dev/null @@ -1,80 +0,0 @@ -// +build !integration - -package publisher - -import ( - "runtime" - "testing" - "time" - - "github.com/elastic/beats/libbeat/outputs" - "github.com/stretchr/testify/assert" -) - -const ( - shipperName = "testShipper" - hostOnNetwork = "someHost" -) - -type testTopology struct { - hostname string // Hostname returned by GetNameByIP. - - // Parameters from PublishIPs invocation. - publishName chan string - publishLocalAddrs chan []string -} - -var _ outputs.TopologyOutputer = testTopology{} - -func (topo testTopology) PublishIPs(name string, localAddrs []string) error { - topo.publishName <- name - topo.publishLocalAddrs <- localAddrs - return nil -} - -func (topo testTopology) GetNameByIP(ip string) string { - return topo.hostname -} - -// Test GetServerName. -func TestPublisherTypeGetServerName(t *testing.T) { - pt := &BeatPublisher{name: shipperName} - assert.Equal(t, shipperName, pt.GetServerName("127.0.0.1")) - - // Unknown hosts return empty string. - assert.Equal(t, "", pt.GetServerName("172.0.0.1")) - - // Hostname is returned when topology knows the IP. - pt.TopologyOutput = testTopology{hostname: hostOnNetwork} - assert.Equal(t, hostOnNetwork, pt.GetServerName("172.0.0.1")) -} - -// Test the PublisherType UpdateTopologyPeriodically() method. -func TestPublisherTypeUpdateTopologyPeriodically(t *testing.T) { - // Setup. - c := make(chan time.Time, 1) - topo := testTopology{ - hostname: hostOnNetwork, - publishName: make(chan string, 1), - publishLocalAddrs: make(chan []string, 1), - } - pt := &BeatPublisher{ - name: shipperName, - RefreshTopologyTimer: c, - TopologyOutput: topo, - } - - // Simulate a single clock tick and close the channel. - c <- time.Now() - close(c) - pt.UpdateTopologyPeriodically() - - // Validate that PublishTopology was invoked. - assert.Equal(t, shipperName, <-topo.publishName) - switch runtime.GOOS { - default: - assert.True(t, len(<-topo.publishLocalAddrs) > 0) - case "nacl", "plan9", "solaris": - t.Skipf("Golang's net.InterfaceAddrs is a stub on %s", runtime.GOOS) - } -} diff --git a/packetbeat/docs/faq.asciidoc b/packetbeat/docs/faq.asciidoc index 00fb283ec6d..24ae6cdc4bb 100644 --- a/packetbeat/docs/faq.asciidoc +++ b/packetbeat/docs/faq.asciidoc @@ -4,16 +4,6 @@ This section contains frequently asked questions about Packetbeat. Also check out the https://discuss.elastic.co/c/beats/packetbeat[Packetbeat discussion forum]. -[float] -[[client-server-fields-empty]] -=== Client_server and server fields are empty? - -The `client_server` and `server` fields are empty when Packetbeat is not configured -to capture information about the network topology. - -To capture information about the network topology, set the `save_topology` configuration option to true and make sure that -you are sending the output to Elasticsearch. - [float] [[dashboard-fields-incorrect]] === Dashboard in Kibana is breaking up data fields incorrectly? @@ -38,8 +28,8 @@ For example: `ip link set enp5s0f1 promisc on` === Packetbeat can't capture traffic from Windows loopback interface? Packetbeat is unable to capture traffic from the loopback device (127.0.0.1 traffic) -because the Windows TCP/IP stack does not implement a network loopback interface, -making it difficult for Windows packet capture drivers like WinPcap to sniff traffic. +because the Windows TCP/IP stack does not implement a network loopback interface, +making it difficult for Windows packet capture drivers like WinPcap to sniff traffic. As a workaround, you can try installing https://github.com/nmap/npcap/releases[Npcap], an update of WinPcap. Make sure that you restart Windows after installing Npcap. @@ -64,11 +54,11 @@ PS C:\Users\vagrant\Desktop\packetbeat-1.2.0-windows> .\packetbeat.exe -devices === Packetbeat is missing long running transactions? Packetbeat has an internal timeout that it uses to time out transactions and TCP connections -when no packets have been seen for a long time. +when no packets have been seen for a long time. To process long running transactions, you can specify a larger value for the <> option. However, keep in mind that very large timeout values can increase memory usage if messages are lost or transaction -response messages are not sent. +response messages are not sent. include::../../libbeat/docs/faq-limit-bandwidth.asciidoc[] include::../../libbeat/docs/shared-faq.asciidoc[] diff --git a/packetbeat/docs/maintaining-topology.asciidoc b/packetbeat/docs/maintaining-topology.asciidoc deleted file mode 100644 index 802a5849be9..00000000000 --- a/packetbeat/docs/maintaining-topology.asciidoc +++ /dev/null @@ -1,22 +0,0 @@ -[[maintaining-topology]] -== Maintaining the Real-Time State of the Network Topology - -deprecated[5.0.0] - -One important feature of Packetbeat is that it knows the name of the source and -destination servers for each transaction. It does this without needing to maintain -a central configuration. Instead, each Beat notes the hostname of the server -where the Beat runs, and maps the hostname to the list of IP addresses of that server. - -Packetbeat stores the topology information in an Elasticsearch index, so to save -the network topology, you need to use Elasticsearch as output and set the -`save_topology` configuration option to true. - -For example: - -[source,yaml] ------------------------------------------------------------------------------- -output.elasticsearch: - hosts: ["localhost:9200"] - save_topology: true ------------------------------------------------------------------------------- diff --git a/packetbeat/docs/reference/configuration.asciidoc b/packetbeat/docs/reference/configuration.asciidoc index 18f1b8d6716..4020ba60be5 100644 --- a/packetbeat/docs/reference/configuration.asciidoc +++ b/packetbeat/docs/reference/configuration.asciidoc @@ -25,12 +25,3 @@ configuration settings, you need to restart {beatname_uc} to pick up the changes * <> * <> * <> - -NOTE: Packetbeat maintains a real-time topology map of all the servers in your network. -See <> for more details. - -include::configuration/packetbeat-options.asciidoc[] - - - - diff --git a/winlogbeat/config/config.go b/winlogbeat/config/config.go index 736dcd83814..895f1869d0d 100644 --- a/winlogbeat/config/config.go +++ b/winlogbeat/config/config.go @@ -46,7 +46,7 @@ func (s Settings) Validate() error { validKeys := []string{ "fields", "fields_under_root", "tags", - "name", "refresh_topology_freq", "topology_expire", "geoip", + "name", "geoip", "queue_size", "bulk_queue_size", "max_procs", "processors", "logging", "output", "path", "winlogbeat", "dashboards", diff --git a/winlogbeat/config/config_test.go b/winlogbeat/config/config_test.go index ccaa571e045..2d8f44a6127 100644 --- a/winlogbeat/config/config_test.go +++ b/winlogbeat/config/config_test.go @@ -48,7 +48,7 @@ func TestConfigValidate(t *testing.T) { }, "1 error: Invalid top-level key 'other' found. Valid keys are bulk_queue_size, dashboards, " + "fields, fields_under_root, geoip, logging, max_procs, " + - "name, output, path, processors, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat", + "name, output, path, processors, queue_size, tags, winlogbeat", }, { WinlogbeatConfig{},