diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 6ad302d668e47..e732f2871f0ee 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -10,6 +10,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/azure_storage_queue" _ "github.com/influxdata/telegraf/plugins/inputs/bcache" _ "github.com/influxdata/telegraf/plugins/inputs/beanstalkd" + _ "github.com/influxdata/telegraf/plugins/inputs/beat" _ "github.com/influxdata/telegraf/plugins/inputs/bind" _ "github.com/influxdata/telegraf/plugins/inputs/bond" _ "github.com/influxdata/telegraf/plugins/inputs/burrow" diff --git a/plugins/inputs/beat/README.md b/plugins/inputs/beat/README.md new file mode 100644 index 0000000000000..113187acda585 --- /dev/null +++ b/plugins/inputs/beat/README.md @@ -0,0 +1,143 @@ +# Beat Plugin +The Beat plugin will collect metrics from the given Beat instances. It is +known to work with Filebeat and Kafkabeat. +### Configuration: +```toml + ## An URL from which to read beat-formatted JSON + ## Default is "http://127.0.0.1:5066". + url = "http://127.0.0.1:5066" + + ## Enable collection of the listed stats + ## An empty list means collect all. Available options are currently + ## "beat", "libbeat", "system" and "filebeat". + # include = ["beat", "libbeat", "filebeat"] + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "logstash.example.com" + + ## Timeout for HTTP requests + # timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` +### Measurements & Fields +- **beat** + * Fields: + - cpu_system_ticks + - cpu_system_time_ms + - cpu_total_ticks + - cpu_total_time_ms + - cpu_total_value + - cpu_user_ticks + - cpu_user_time_ms + - info_uptime_ms + - memstats_gc_next + - memstats_memory_alloc + - memstats_memory_total + - memstats_rss + * Tags: + - beat_beat + - beat_host + - beat_id + - beat_name + - beat_version + +- **beat_filebeat** + * Fields: + - events_active + - events_added + - events_done + - harvester_closed + - harvester_open_files + - harvester_running + - harvester_skipped + - harvester_started + - input_log_files_renamed + - input_log_files_truncated + * Tags: + - beat_beat + - beat_host + - beat_id + - beat_name + - beat_version + +- **beat_libbeat** + * Fields: + - config_module_running + - config_module_starts + - config_module_stops + - config_reloads + - output_events_acked + - output_events_active + - output_events_batches + - output_events_dropped + - output_events_duplicates + - output_events_failed + - output_events_total + - output_type + - output_read_bytes + - output_read_errors + - output_write_bytes + - output_write_errors + - outputs_kafka_bytes_read + - outputs_kafka_bytes_write + - pipeline_clients + - pipeline_events_active + - pipeline_events_dropped + - pipeline_events_failed + - pipeline_events_filtered + - pipeline_events_published + - pipeline_events_retry + - pipeline_events_total + - pipeline_queue_acked + * Tags: + - beat_beat + - beat_host + - beat_id + - beat_name + - beat_version + +- **beat_system** + * Field: + - cpu_cores + - load_1 + - load_15 + - load_5 + - load_norm_1 + - load_norm_15 + - load_norm_5 + * Tags: + - beat_beat + - beat_host + - beat_id + - beat_name + - beat_version + +### Example Output: +``` +$ telegraf --input-filter beat --test + +> beat,beat_beat=filebeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6 + cpu_system_ticks=656750,cpu_system_time_ms=656750,cpu_total_ticks=5461190,cpu_total_time_ms=5461198,cpu_total_value=5461190,cpu_user_ticks=4804440,cpu_user_time_ms=4804448,info_uptime_ms=342634196,memstats_gc_next=20199584,memstats_memory_alloc=12547424,memstats_memory_total=486296424792,memstats_rss=72552448 1540316047000000000 +> beat_libbeat,beat_beat=filebeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6 + config_module_running=0,config_module_starts=0,config_module_stops=0,config_reloads=0,output_events_acked=192404,output_events_active=0,output_events_batches=1607,output_events_dropped=0,output_events_duplicates=0,output_events_failed=0,output_events_total=192404,output_read_bytes=0,output_read_errors=0,output_write_bytes=0,output_write_errors=0,outputs_kafka_bytes_read=1118528,outputs_kafka_bytes_write=48002014,pipeline_clients=1,pipeline_events_active=0,pipeline_events_dropped=0,pipeline_events_failed=0,pipeline_events_filtered=11496,pipeline_events_published=192404,pipeline_events_retry=14,pipeline_events_total=203900,pipeline_queue_acked=192404 1540316047000000000 +> beat_system,beat_beat=filebeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6 + cpu_cores=32,load_1=46.08,load_15=49.82,load_5=47.88,load_norm_1=1.44,load_norm_15=1.5569,load_norm_5=1.4963 1540316047000000000 +> beat_filebeat,beat_beat=filebeat,beat_host=node-6,beat_id=9c1c8697-acb4-4df0-987d-28197814f788,beat_name=node-6-test,beat_version=6.4.2,host=node-6 + events_active=0,events_added=3223,events_done=3223,harvester_closed=0,harvester_open_files=0,harvester_running=0,harvester_skipped=0,harvester_started=0,input_log_files_renamed=0,input_log_files_truncated=0 1540320286000000000 +``` diff --git a/plugins/inputs/beat/beat.go b/plugins/inputs/beat/beat.go new file mode 100644 index 0000000000000..017b4c27e340a --- /dev/null +++ b/plugins/inputs/beat/beat.go @@ -0,0 +1,234 @@ +package beat + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" + + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" +) + +const sampleConfig = ` + ## An URL from which to read Beat-formatted JSON + ## Default is "http://127.0.0.1:5066". + url = "http://127.0.0.1:5066" + + ## Enable collection of the listed stats + ## An empty list means collect all. Available options are currently + ## "beat", "libbeat", "system" and "filebeat". + # include = ["beat", "libbeat", "filebeat"] + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "logstash.example.com" + + ## Timeout for HTTP requests + # timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +const description = "Read metrics exposed by Beat" + +const suffixInfo = "/" +const suffixStats = "/stats" + +type BeatInfo struct { + Beat string `json:"beat"` + Hostname string `json:"hostname"` + Name string `json:"name"` + UUID string `json:"uuid"` + Version string `json:"version"` +} + +type BeatStats struct { + Beat map[string]interface{} `json:"beat"` + FileBeat interface{} `json:"filebeat"` + Libbeat interface{} `json:"libbeat"` + System interface{} `json:"system"` +} + +type Beat struct { + URL string `toml:"url"` + + Includes []string `toml:"include"` + + Username string `toml:"username"` + Password string `toml:"password"` + Method string `toml:"method"` + Headers map[string]string `toml:"headers"` + HostHeader string `toml:"host_header"` + Timeout internal.Duration `toml:"timeout"` + + tls.ClientConfig + client *http.Client +} + +func NewBeat() *Beat { + return &Beat{ + URL: "http://127.0.0.1:5066", + Includes: []string{"beat", "libbeat", "filebeat"}, + Method: "GET", + Headers: make(map[string]string), + Timeout: internal.Duration{Duration: time.Second * 5}, + } +} + +func (beat *Beat) Init() error { + availableStats := []string{"beat", "libbeat", "system", "filebeat"} + + var err error + beat.client, err = beat.createHTTPClient() + + if err != nil { + return err + } + + err = choice.CheckSlice(beat.Includes, availableStats) + if err != nil { + return err + } + + return nil +} + +func (beat *Beat) Description() string { + return description +} + +func (beat *Beat) SampleConfig() string { + return sampleConfig +} + +// createHTTPClient create a clients to access API +func (beat *Beat) createHTTPClient() (*http.Client, error) { + tlsConfig, err := beat.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: beat.Timeout.Duration, + } + + return client, nil +} + +// gatherJSONData query the data source and parse the response JSON +func (beat *Beat) gatherJSONData(url string, value interface{}) error { + request, err := http.NewRequest(beat.Method, url, nil) + if err != nil { + return err + } + + if beat.Username != "" { + request.SetBasicAuth(beat.Username, beat.Password) + } + for k, v := range beat.Headers { + request.Header.Add(k, v) + } + if beat.HostHeader != "" { + request.Host = beat.HostHeader + } + + response, err := beat.client.Do(request) + if err != nil { + return err + } + + defer response.Body.Close() + + return json.NewDecoder(response.Body).Decode(value) +} + +func (beat *Beat) Gather(accumulator telegraf.Accumulator) error { + beatStats := &BeatStats{} + beatInfo := &BeatInfo{} + + infoUrl, err := url.Parse(beat.URL + suffixInfo) + if err != nil { + return err + } + statsUrl, err := url.Parse(beat.URL + suffixStats) + if err != nil { + return err + } + + err = beat.gatherJSONData(infoUrl.String(), beatInfo) + if err != nil { + return err + } + tags := map[string]string{ + "beat_beat": beatInfo.Beat, + "beat_id": beatInfo.UUID, + "beat_name": beatInfo.Name, + "beat_host": beatInfo.Hostname, + "beat_version": beatInfo.Version, + } + + err = beat.gatherJSONData(statsUrl.String(), beatStats) + if err != nil { + return err + } + + for _, name := range beat.Includes { + var stats interface{} + var metric string + + switch name { + case "beat": + stats = beatStats.Beat + metric = "beat" + case "filebeat": + stats = beatStats.FileBeat + metric = "beat_filebeat" + case "system": + stats = beatStats.System + metric = "beat_system" + case "libbeat": + stats = beatStats.Libbeat + metric = "beat_libbeat" + default: + return fmt.Errorf("unknown stats-type %q", name) + } + flattener := jsonparser.JSONFlattener{} + err := flattener.FullFlattenJSON("", stats, true, true) + if err != nil { + return err + } + accumulator.AddFields(metric, flattener.Fields, tags) + } + + return nil +} + +func init() { + inputs.Add("beat", func() telegraf.Input { + return NewBeat() + }) +} diff --git a/plugins/inputs/beat/beat6_info.json b/plugins/inputs/beat/beat6_info.json new file mode 100644 index 0000000000000..3cc318c330447 --- /dev/null +++ b/plugins/inputs/beat/beat6_info.json @@ -0,0 +1,7 @@ +{ + "beat": "filebeat", + "hostname": "node-6", + "name": "node-6-test", + "uuid": "9c1c8697-acb4-4df0-987d-28197814f785", + "version": "6.4.2" +} diff --git a/plugins/inputs/beat/beat6_stats.json b/plugins/inputs/beat/beat6_stats.json new file mode 100644 index 0000000000000..f34b9d1f06d1e --- /dev/null +++ b/plugins/inputs/beat/beat6_stats.json @@ -0,0 +1,137 @@ +{ + "beat": { + "cpu": { + "system": { + "ticks": 626970, + "time": { + "ms": 626972 + } + }, + "total": { + "ticks": 5215010, + "time": { + "ms": 5215018 + }, + "value": 5215010 + }, + "user": { + "ticks": 4588040, + "time": { + "ms": 4588046 + } + } + }, + "info": { + "ephemeral_id": "809e3b63-4fa0-4f74-822a-8e3c08298336", + "uptime": { + "ms": 327248661 + } + }, + "memstats": { + "gc_next": 20611808, + "memory_alloc": 12692544, + "memory_total": 462910102088, + "rss": 80273408 + } + }, + "filebeat": { + "events": { + "active": 0, + "added": 182990, + "done": 182990 + }, + "harvester": { + "closed": 2222, + "open_files": 4, + "running": 4, + "skipped": 0, + "started": 2226 + }, + "input": { + "log": { + "files": { + "renamed": 0, + "truncated": 0 + } + } + } + }, + "libbeat": { + "config": { + "module": { + "running": 0, + "starts": 0, + "stops": 0 + }, + "reloads": 0 + }, + "output": { + "events": { + "acked": 172067, + "active": 0, + "batches": 1490, + "dropped": 0, + "duplicates": 0, + "failed": 0, + "total": 172067 + }, + "read": { + "bytes": 0, + "errors": 0 + }, + "type": "kafka", + "write": { + "bytes": 0, + "errors": 0 + } + }, + "outputs": { + "kafka": { + "bytes_read": 1048670, + "bytes_write": 43136887 + } + }, + "pipeline": { + "clients": 1, + "events": { + "active": 0, + "dropped": 0, + "failed": 0, + "filtered": 10923, + "published": 172067, + "retry": 14, + "total": 182990 + }, + "queue": { + "acked": 172067 + } + } + }, + "registrar": { + "states": { + "cleanup": 3446, + "current": 16409, + "update": 182990 + }, + "writes": { + "fail": 0, + "success": 11718, + "total": 11718 + } + }, + "system": { + "cpu": { + "cores": 32 + }, + "load": { + "1": 32.49, + "15": 41.9, + "5": 40.16, + "norm": { + "1": 1.0153, + "15": 1.3094, + "5": 1.255 + } + } + } +} diff --git a/plugins/inputs/beat/beat_test.go b/plugins/inputs/beat/beat_test.go new file mode 100644 index 0000000000000..30dd48569f3a6 --- /dev/null +++ b/plugins/inputs/beat/beat_test.go @@ -0,0 +1,228 @@ +package beat + +import ( + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +func Test_BeatStats(test *testing.T) { + var beat6StatsAccumulator testutil.Accumulator + var beatTest = NewBeat() + // System stats are disabled by default + beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} + err := beatTest.Init() + if err != nil { + panic(fmt.Sprintf("could not init beat: %s", err)) + } + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) { + var jsonFilePath string + + switch request.URL.Path { + case suffixInfo: + jsonFilePath = "beat6_info.json" + case suffixStats: + jsonFilePath = "beat6_stats.json" + default: + panic("Cannot handle request") + } + + data, err := ioutil.ReadFile(jsonFilePath) + + if err != nil { + panic(fmt.Sprintf("could not read from data file %s", jsonFilePath)) + } + w.Write(data) + })) + requestURL, err := url.Parse(beatTest.URL) + if err != nil { + test.Logf("Can't parse URL %s", beatTest.URL) + } + fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + if err != nil { + test.Logf("Can't listen for %s: %v", requestURL, err) + } + + fakeServer.Start() + defer fakeServer.Close() + + err = beatTest.Gather(&beat6StatsAccumulator) + if err != nil { + test.Logf("Can't gather stats") + } + + beat6StatsAccumulator.AssertContainsTaggedFields( + test, + "beat", + map[string]interface{}{ + "cpu_system_ticks": float64(626970), + "cpu_system_time_ms": float64(626972), + "cpu_total_ticks": float64(5215010), + "cpu_total_time_ms": float64(5215018), + "cpu_total_value": float64(5215010), + "cpu_user_ticks": float64(4588040), + "cpu_user_time_ms": float64(4588046), + "info_uptime_ms": float64(327248661), + "info_ephemeral_id": "809e3b63-4fa0-4f74-822a-8e3c08298336", + "memstats_gc_next": float64(20611808), + "memstats_memory_alloc": float64(12692544), + "memstats_memory_total": float64(462910102088), + "memstats_rss": float64(80273408), + }, + map[string]string{ + "beat_beat": string("filebeat"), + "beat_host": string("node-6"), + "beat_id": string("9c1c8697-acb4-4df0-987d-28197814f785"), + "beat_name": string("node-6-test"), + "beat_version": string("6.4.2"), + }, + ) + beat6StatsAccumulator.AssertContainsTaggedFields( + test, + "beat_filebeat", + map[string]interface{}{ + "events_active": float64(0), + "events_added": float64(182990), + "events_done": float64(182990), + "harvester_closed": float64(2222), + "harvester_open_files": float64(4), + "harvester_running": float64(4), + "harvester_skipped": float64(0), + "harvester_started": float64(2226), + "input_log_files_renamed": float64(0), + "input_log_files_truncated": float64(0), + }, + map[string]string{ + "beat_beat": string("filebeat"), + "beat_host": string("node-6"), + "beat_id": string("9c1c8697-acb4-4df0-987d-28197814f785"), + "beat_name": string("node-6-test"), + "beat_version": string("6.4.2"), + }, + ) + beat6StatsAccumulator.AssertContainsTaggedFields( + test, + "beat_libbeat", + map[string]interface{}{ + "config_module_running": float64(0), + "config_module_starts": float64(0), + "config_module_stops": float64(0), + "config_reloads": float64(0), + "output_type": "kafka", + "output_events_acked": float64(172067), + "output_events_active": float64(0), + "output_events_batches": float64(1490), + "output_events_dropped": float64(0), + "output_events_duplicates": float64(0), + "output_events_failed": float64(0), + "output_events_total": float64(172067), + "output_read_bytes": float64(0), + "output_read_errors": float64(0), + "output_write_bytes": float64(0), + "output_write_errors": float64(0), + "outputs_kafka_bytes_read": float64(1048670), + "outputs_kafka_bytes_write": float64(43136887), + "pipeline_clients": float64(1), + "pipeline_events_active": float64(0), + "pipeline_events_dropped": float64(0), + "pipeline_events_failed": float64(0), + "pipeline_events_filtered": float64(10923), + "pipeline_events_published": float64(172067), + "pipeline_events_retry": float64(14), + "pipeline_events_total": float64(182990), + "pipeline_queue_acked": float64(172067), + }, + map[string]string{ + "beat_beat": string("filebeat"), + "beat_host": string("node-6"), + "beat_id": string("9c1c8697-acb4-4df0-987d-28197814f785"), + "beat_name": string("node-6-test"), + "beat_version": string("6.4.2"), + }, + ) + beat6StatsAccumulator.AssertContainsTaggedFields( + test, + "beat_system", + map[string]interface{}{ + "cpu_cores": float64(32), + "load_1": float64(32.49), + "load_15": float64(41.9), + "load_5": float64(40.16), + "load_norm_1": float64(1.0153), + "load_norm_15": float64(1.3094), + "load_norm_5": float64(1.255), + }, + map[string]string{ + "beat_beat": string("filebeat"), + "beat_host": string("node-6"), + "beat_id": string("9c1c8697-acb4-4df0-987d-28197814f785"), + "beat_name": string("node-6-test"), + "beat_version": string("6.4.2"), + }, + ) +} + +func Test_BeatRequest(test *testing.T) { + var beat6StatsAccumulator testutil.Accumulator + beatTest := NewBeat() + // System stats are disabled by default + beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"} + err := beatTest.Init() + if err != nil { + panic(fmt.Sprintf("could not init beat: %s", err)) + } + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) { + var jsonFilePath string + + switch request.URL.Path { + case suffixInfo: + jsonFilePath = "beat6_info.json" + case suffixStats: + jsonFilePath = "beat6_stats.json" + default: + panic("Cannot handle request") + } + + data, err := ioutil.ReadFile(jsonFilePath) + + if err != nil { + panic(fmt.Sprintf("could not read from data file %s", jsonFilePath)) + } + assert.Equal(test, request.Host, "beat.test.local") + assert.Equal(test, request.Method, "POST") + assert.Equal(test, request.Header.Get("Authorization"), "Basic YWRtaW46UFdE") + assert.Equal(test, request.Header.Get("X-Test"), "test-value") + + w.Write(data) + })) + + requestURL, err := url.Parse(beatTest.URL) + if err != nil { + test.Logf("Can't parse URL %s", beatTest.URL) + } + fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + if err != nil { + test.Logf("Can't listen for %s: %v", requestURL, err) + } + fakeServer.Start() + defer fakeServer.Close() + + beatTest.Headers["X-Test"] = "test-value" + beatTest.HostHeader = "beat.test.local" + beatTest.Method = "POST" + beatTest.Username = "admin" + beatTest.Password = "PWD" + + err = beatTest.Gather(&beat6StatsAccumulator) + if err != nil { + test.Logf("Can't gather stats") + } + +}