diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 9c183fcbbe9cc..197fe2b1cbe07 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -67,6 +67,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs" _ "github.com/influxdata/telegraf/plugins/inputs/logparser" + _ "github.com/influxdata/telegraf/plugins/inputs/logstash" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/mcrouter" diff --git a/plugins/inputs/logstash/README.md b/plugins/inputs/logstash/README.md new file mode 100644 index 0000000000000..dca10e0ad9d0f --- /dev/null +++ b/plugins/inputs/logstash/README.md @@ -0,0 +1,187 @@ +# Logstash Input Plugin + +This plugin reads metrics exposed by [Logstash Monitoring API](https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html). + +### Configuration: + +```toml + ## This plugin reads metrics exposed by Logstash Monitoring API. + ## https://www.elastic.co/guide/en/logstash/current/monitoring.html + + ## The URL of the exposed Logstash API endpoint + url = "http://127.0.0.1:9600" + + ## Enable Logstash 6+ multi-pipeline statistics support + multi_pipeline = true + + ## Should the general process statistics be gathered + collect_process_stats = true + + ## Should the JVM specific statistics be gathered + collect_jvm_stats = true + + ## Should the event pipelines statistics be gathered + collect_pipelines_stats = true + + ## Should the plugin statistics be gathered + collect_plugins_stats = true + + ## Should the queue statistics be gathered + collect_queue_stats = true + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "logstash.example.com" + + ## Timeout for HTTP requests + timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +### Measurements & Fields: + +- **logstash_jvm** + * Fields: + - threads_peak_count + - mem_pools_survivor_peak_max_in_bytes + - mem_pools_survivor_max_in_bytes + - mem_pools_old_peak_used_in_bytes + - mem_pools_young_used_in_bytes + - mem_non_heap_committed_in_bytes + - threads_count + - mem_pools_old_committed_in_bytes + - mem_pools_young_peak_max_in_bytes + - mem_heap_used_percent + - gc_collectors_young_collection_time_in_millis + - mem_pools_survivor_peak_used_in_bytes + - mem_pools_young_committed_in_bytes + - gc_collectors_old_collection_time_in_millis + - gc_collectors_old_collection_count + - mem_pools_survivor_used_in_bytes + - mem_pools_old_used_in_bytes + - mem_pools_young_max_in_bytes + - mem_heap_max_in_bytes + - mem_non_heap_used_in_bytes + - mem_pools_survivor_committed_in_bytes + - mem_pools_old_max_in_bytes + - mem_heap_committed_in_bytes + - mem_pools_old_peak_max_in_bytes + - mem_pools_young_peak_used_in_bytes + - mem_heap_used_in_bytes + - gc_collectors_young_collection_count + - uptime_in_millis + * Tags: + - node_id + - node_name + - node_host + - node_version + +- **logstash_process** + * Fields: + - open_file_descriptors + - cpu_load_average_1m + - cpu_load_average_5m + - cpu_load_average_15m + - cpu_total_in_millis + - cpu_percent + - peak_open_file_descriptors + - max_file_descriptors + - mem_total_virtual_in_bytes + - mem_total_virtual_in_bytes + * Tags: + - node_id + - node_name + - node_host + - node_version + +- **logstash_events** + * Fields: + - queue_push_duration_in_millis + - duration_in_millis + - in + - filtered + - out + * Tags: + - node_id + - node_name + - node_host + - node_version + - pipeline (for Logstash 6 only) + +- **logstash_plugins** + * Fields: + - queue_push_duration_in_millis (for input plugins only) + - duration_in_millis + - in + - out + * Tags: + - node_id + - node_name + - node_host + - node_version + - pipeline (for Logstash 6 only) + - plugin_id + - plugin_name + - plugin_type + +- **logstash_queue** + * Fields: + - events + - free_space_in_bytes + - max_queue_size_in_bytes + - max_unread_events + - page_capacity_in_bytes + - queue_size_in_bytes + * Tags: + - node_id + - node_name + - node_host + - node_version + - pipeline (for Logstash 6 only) + - queue_type + +### Tags description + +- node_id - The uuid of the logstash node. Randomly generated. +- node_name - The name of the logstash node. Can be defined in the *logstash.yml* or defaults to the hostname. + Can be used to break apart metrics from different logstash instances of the same host. +- node_host - The hostname of the logstash node. + Can be different from the telegraf's host if a remote connection to logstash instance is used. +- node_version - The version of logstash service running on this node. +- pipeline (for Logstash 6 only) - The name of a pipeline if multi-pipeline is configured. + Will defaults to "main" if there is only one pipeline and will be missing for logstash 5. +- plugin_id - The unique id of this plugin. + It will be a randomly generated string unless it's defined in the logstash pipeline config file. +- plugin_name - The name of this plugin. i.e. file, elasticsearch, date, mangle. +- plugin_type - The type of this plugin i.e. input/filter/output. + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter logstash -test + +> logstash_jvm,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 + gc_collectors_old_collection_count=5,gc_collectors_old_collection_time_in_millis=702,gc_collectors_young_collection_count=95,gc_collectors_young_collection_time_in_millis=4772,mem_heap_committed_in_bytes=360804352,mem_heap_max_in_bytes=8389328896,mem_heap_used_in_bytes=252629768,mem_heap_used_percent=3,mem_non_heap_committed_in_bytes=212144128,mem_non_heap_used_in_bytes=188726024,mem_pools_old_committed_in_bytes=280260608,mem_pools_old_max_in_bytes=6583418880,mem_pools_old_peak_max_in_bytes=6583418880,mem_pools_old_peak_used_in_bytes=235352976,mem_pools_old_used_in_bytes=194754608,mem_pools_survivor_committed_in_bytes=8912896,mem_pools_survivor_max_in_bytes=200605696,mem_pools_survivor_peak_max_in_bytes=200605696,mem_pools_survivor_peak_used_in_bytes=8912896,mem_pools_survivor_used_in_bytes=4476680,mem_pools_young_committed_in_bytes=71630848,mem_pools_young_max_in_bytes=1605304320,mem_pools_young_peak_max_in_bytes=1605304320,mem_pools_young_peak_used_in_bytes=71630848,mem_pools_young_used_in_bytes=53398480,threads_count=60,threads_peak_count=62,uptime_in_millis=10469014 1540289864000000000 +> logstash_process,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2 + cpu_load_average_15m=39.84,cpu_load_average_1m=32.87,cpu_load_average_5m=39.23,cpu_percent=0,cpu_total_in_millis=389920,max_file_descriptors=262144,mem_total_virtual_in_bytes=17921355776,open_file_descriptors=132,peak_open_file_descriptors=140 1540289864000000000 +> logstash_events,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main + duration_in_millis=175144,filtered=4543,in=4543,out=4543,queue_push_duration_in_millis=19 1540289864000000000 +> logstash_plugins,host=node-6,node_host=node-6,node_id=3044f675-21ce-4335-898a-8408aa678245,node_name=node-6-test,node_version=6.4.2,pipeline=main,plugin_id=input-kafka,plugin_name=kafka,plugin_type=input + duration_in_millis=0,in=0,out=4543,queue_push_duration_in_millis=19 1540289864000000000 +``` diff --git a/plugins/inputs/logstash/logstash.go b/plugins/inputs/logstash/logstash.go new file mode 100644 index 0000000000000..ba25fafd59a17 --- /dev/null +++ b/plugins/inputs/logstash/logstash.go @@ -0,0 +1,506 @@ +package logstash + +import ( + "encoding/json" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" + "net/http" + "net/url" + "time" + + jsonParser "github.com/influxdata/telegraf/plugins/parsers/json" +) + +// ##### Interface ##### + +const sampleConfig = ` + ## This plugin reads metrics exposed by Logstash Monitoring API. + ## https://www.elastic.co/guide/en/logstash/current/monitoring.html + + ## The URL of the exposed Logstash API endpoint + url = "http://127.0.0.1:9600" + + ## Enable Logstash 6+ multi-pipeline statistics support + multi_pipeline = true + + ## Should the general process statistics be gathered + collect_process_stats = true + + ## Should the JVM specific statistics be gathered + collect_jvm_stats = true + + ## Should the event pipelines statistics be gathered + collect_pipelines_stats = true + + ## Should the plugin statistics be gathered + collect_plugins_stats = true + + ## Should the queue statistics be gathered + collect_queue_stats = true + + ## HTTP method + # method = "GET" + + ## Optional HTTP headers + # headers = {"X-Special-Header" = "Special-Value"} + + ## Override HTTP "Host" header + # host_header = "logstash.example.com" + + ## Timeout for HTTP requests + timeout = "5s" + + ## Optional HTTP Basic Auth credentials + # username = "username" + # password = "pa$$word" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +type Logstash struct { + URL string `toml:"url"` + + MultiPipeline bool `toml:"multi_pipeline"` + CollectProcessStats bool `toml:"collect_process_stats"` + CollectJVMStats bool `toml:"collect_jvm_stats"` + CollectPipelinesStats bool `toml:"collect_pipelines_stats"` + CollectPluginsStats bool `toml:"collect_plugins_stats"` + CollectQueueStats bool `toml:"collect_queue_stats"` + + Username string `toml:"username"` + Password string `toml:"password"` + Method string `toml:"method"` + Headers map[string]string `toml:"headers"` + HostHeader string `toml:"host_header"` + Timeout internal.Duration `toml:"timeout"` + + tls.ClientConfig + client *http.Client +} + +// NewLogstash create an instance of the plugin with default settings +func NewLogstash() *Logstash { + return &Logstash{ + URL: "http://127.0.0.1:9600", + MultiPipeline: true, + CollectProcessStats: true, + CollectJVMStats: true, + CollectPipelinesStats: true, + CollectPluginsStats: true, + CollectQueueStats: true, + Method: "GET", + Headers: make(map[string]string), + HostHeader: "", + Timeout: internal.Duration{Duration: time.Second * 5}, + } +} + +// init initialise this plugin instance +func init() { + inputs.Add("logstash", func() telegraf.Input { + return NewLogstash() + }) +} + +// Description returns short info about plugin +func (logstash *Logstash) Description() string { + return "Read metrics exposed by Logstash" +} + +// SampleConfig returns details how to configure plugin +func (logstash *Logstash) SampleConfig() string { + return sampleConfig +} + +type ProcessStats struct { + ID string `json:"id"` + Process interface{} `json:"process"` + Name string `json:"name"` + Host string `json:"host"` + Version string `json:"version"` +} + +type JVMStats struct { + ID string `json:"id"` + JVM interface{} `json:"jvm"` + Name string `json:"name"` + Host string `json:"host"` + Version string `json:"version"` +} + +type PipelinesStats struct { + ID string `json:"id"` + Pipelines map[string]Pipeline `json:"pipelines"` + Name string `json:"name"` + Host string `json:"host"` + Version string `json:"version"` +} + +type PipelineStats struct { + ID string `json:"id"` + Pipeline Pipeline `json:"pipeline"` + Name string `json:"name"` + Host string `json:"host"` + Version string `json:"version"` +} + +type Pipeline struct { + Events interface{} `json:"events"` + Plugins PipelinePlugins `json:"plugins"` + Reloads interface{} `json:"reloads"` + Queue PipelineQueue `json:"queue"` +} + +type Plugin struct { + ID string `json:"id"` + Events interface{} `json:"events"` + Name string `json:"name"` +} + +type PipelinePlugins struct { + Inputs []Plugin `json:"inputs"` + Filters []Plugin `json:"filters"` + Outputs []Plugin `json:"outputs"` +} + +type PipelineQueue struct { + Events float64 `json:"events"` + Type string `json:"type"` + Capacity interface{} `json:"capacity"` + Data interface{} `json:"data"` +} + +const jvmStats = "/_node/stats/jvm" +const processStats = "/_node/stats/process" +const pipelinesStats = "/_node/stats/pipelines" +const pipelineStats = "/_node/stats/pipeline" + +// createHttpClient create a clients to access API +func (logstash *Logstash) createHttpClient() (*http.Client, error) { + tlsConfig, err := logstash.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + Timeout: logstash.Timeout.Duration, + } + + return client, nil +} + +// gatherJsonData query the data source and parse the response JSON +func (logstash *Logstash) gatherJsonData(url string, value interface{}) error { + + var method string + if logstash.Method != "" { + method = logstash.Method + } else { + method = "GET" + } + + request, err := http.NewRequest(method, url, nil) + if err != nil { + return err + } + + if (logstash.Username != "") || (logstash.Password != "") { + request.SetBasicAuth(logstash.Username, logstash.Password) + } + for header, value := range logstash.Headers { + request.Header.Add(header, value) + } + if logstash.HostHeader != "" { + request.Host = logstash.HostHeader + } + + response, err := logstash.client.Do(request) + if err != nil { + return err + } + + defer response.Body.Close() + + err = json.NewDecoder(response.Body).Decode(value) + if err != nil { + return err + } + + return nil +} + +// gatherJVMStats gather the JVM metrics and add results to the accumulator +func (logstash *Logstash) gatherJVMStats(url string, accumulator telegraf.Accumulator) error { + jvmStats := &JVMStats{} + + err := logstash.gatherJsonData(url, jvmStats) + if err != nil { + return err + } + + tags := map[string]string{ + "node_id": jvmStats.ID, + "node_name": jvmStats.Name, + "node_host": jvmStats.Host, + "node_version": jvmStats.Version, + } + + flattener := jsonParser.JSONFlattener{} + err = flattener.FlattenJSON("", jvmStats.JVM) + if err != nil { + return err + } + accumulator.AddFields("logstash_jvm", flattener.Fields, tags) + + return nil +} + +// gatherJVMStats gather the Process metrics and add results to the accumulator +func (logstash *Logstash) gatherProcessStats(url string, accumulator telegraf.Accumulator) error { + processStats := &ProcessStats{} + + err := logstash.gatherJsonData(url, processStats) + if err != nil { + return err + } + + tags := map[string]string{ + "node_id": processStats.ID, + "node_name": processStats.Name, + "node_host": processStats.Host, + "node_version": processStats.Version, + } + + flattener := jsonParser.JSONFlattener{} + err = flattener.FlattenJSON("", processStats.Process) + if err != nil { + return err + } + accumulator.AddFields("logstash_process", flattener.Fields, tags) + + return nil +} + +// gatherPluginsStats go through a list of plugins and add their metrics to the accumulator +func (logstash *Logstash) gatherPluginsStats( + plugins []Plugin, + pluginType string, + tags map[string]string, + accumulator telegraf.Accumulator) error { + + for _, plugin := range plugins { + pluginTags := map[string]string{ + "plugin_name": plugin.Name, + "plugin_id": plugin.ID, + "plugin_type": pluginType, + } + for tag, value := range tags { + pluginTags[tag] = value + } + flattener := jsonParser.JSONFlattener{} + err := flattener.FlattenJSON("", plugin.Events) + if err != nil { + return err + } + accumulator.AddFields("logstash_plugins", flattener.Fields, pluginTags) + } + + return nil +} + +func (logstash *Logstash) gatherQueueStats( + queue *PipelineQueue, + tags map[string]string, + accumulator telegraf.Accumulator) error { + + var err error + queueTags := map[string]string{ + "queue_type": queue.Type, + } + for tag, value := range tags { + queueTags[tag] = value + } + + queueFields := map[string]interface{}{ + "events": queue.Events, + } + + if queue.Type != "memory" { + flattener := jsonParser.JSONFlattener{} + err = flattener.FlattenJSON("", queue.Capacity) + if err != nil { + return err + } + err = flattener.FlattenJSON("", queue.Data) + if err != nil { + return err + } + for field, value := range flattener.Fields { + queueFields[field] = value + } + } + + accumulator.AddFields("logstash_queue", queueFields, queueTags) + + return nil +} + +// gatherJVMStats gather the Pipeline metrics and add results to the accumulator (for Logstash < 6) +func (logstash *Logstash) gatherPipelineStats(url string, accumulator telegraf.Accumulator) error { + pipelineStats := &PipelineStats{} + + err := logstash.gatherJsonData(url, pipelineStats) + if err != nil { + return err + } + + tags := map[string]string{ + "node_id": pipelineStats.ID, + "node_name": pipelineStats.Name, + "node_host": pipelineStats.Host, + "node_version": pipelineStats.Version, + } + + flattener := jsonParser.JSONFlattener{} + err = flattener.FlattenJSON("", pipelineStats.Pipeline.Events) + if err != nil { + return err + } + accumulator.AddFields("logstash_events", flattener.Fields, tags) + + if logstash.CollectPluginsStats { + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Inputs, "input", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Filters, "filter", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipelineStats.Pipeline.Plugins.Outputs, "output", tags, accumulator) + if err != nil { + return err + } + } + + if logstash.CollectQueueStats { + err = logstash.gatherQueueStats(&pipelineStats.Pipeline.Queue, tags, accumulator) + } + + return nil +} + +// gatherJVMStats gather the Pipelines metrics and add results to the accumulator (for Logstash >= 6) +func (logstash *Logstash) gatherPipelinesStats(url string, accumulator telegraf.Accumulator) error { + pipelinesStats := &PipelinesStats{} + + err := logstash.gatherJsonData(url, pipelinesStats) + if err != nil { + return err + } + + for pipelineName, pipeline := range pipelinesStats.Pipelines { + tags := map[string]string{ + "node_id": pipelinesStats.ID, + "node_name": pipelinesStats.Name, + "node_host": pipelinesStats.Host, + "node_version": pipelinesStats.Version, + "pipeline": pipelineName, + } + + flattener := jsonParser.JSONFlattener{} + err := flattener.FlattenJSON("", pipeline.Events) + if err != nil { + return err + } + accumulator.AddFields("logstash_events", flattener.Fields, tags) + + if logstash.CollectPluginsStats { + err = logstash.gatherPluginsStats(pipeline.Plugins.Inputs, "input", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipeline.Plugins.Filters, "filter", tags, accumulator) + if err != nil { + return err + } + err = logstash.gatherPluginsStats(pipeline.Plugins.Outputs, "output", tags, accumulator) + if err != nil { + return err + } + } + + if logstash.CollectQueueStats { + err = logstash.gatherQueueStats(&pipeline.Queue, tags, accumulator) + } + + } + + return nil +} + +// Gather ask this plugin to start gathering metrics +func (logstash *Logstash) Gather(accumulator telegraf.Accumulator) error { + + if logstash.client == nil { + client, err := logstash.createHttpClient() + + if err != nil { + return err + } + logstash.client = client + } + + if logstash.CollectJVMStats { + jvmUrl, err := url.Parse(logstash.URL + jvmStats) + if err != nil { + return err + } + if err := logstash.gatherJVMStats(jvmUrl.String(), accumulator); err != nil { + return err + } + } + + if logstash.CollectProcessStats { + processUrl, err := url.Parse(logstash.URL + processStats) + if err != nil { + return err + } + if err := logstash.gatherProcessStats(processUrl.String(), accumulator); err != nil { + return err + } + } + + if logstash.CollectPipelinesStats { + if logstash.MultiPipeline { + pipelinesUrl, err := url.Parse(logstash.URL + pipelinesStats) + if err != nil { + return err + } + if err := logstash.gatherPipelinesStats(pipelinesUrl.String(), accumulator); err != nil { + return err + } + } else { + pipelineUrl, err := url.Parse(logstash.URL + pipelineStats) + if err != nil { + return err + } + if err := logstash.gatherPipelineStats(pipelineUrl.String(), accumulator); err != nil { + return err + } + } + } + + return nil +} diff --git a/plugins/inputs/logstash/logstash_test.go b/plugins/inputs/logstash/logstash_test.go new file mode 100644 index 0000000000000..285b39f5b0858 --- /dev/null +++ b/plugins/inputs/logstash/logstash_test.go @@ -0,0 +1,723 @@ +package logstash + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +var logstashTest = NewLogstash() + +var ( + logstash5accPipelineStats testutil.Accumulator + logstash6accPipelinesStats testutil.Accumulator + logstash5accProcessStats testutil.Accumulator + logstash6accProcessStats testutil.Accumulator + logstash5accJVMStats testutil.Accumulator + logstash6accJVMStats testutil.Accumulator +) + +func Test_Logstash5GatherProcessStats(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash5ProcessJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherProcessStats(logstashTest.URL+processStats, &logstash5accProcessStats); err != nil { + test.Logf("Can't gather Process stats") + } + + logstash5accProcessStats.AssertContainsTaggedFields( + test, + "logstash_process", + map[string]interface{}{ + "open_file_descriptors": float64(89.0), + "max_file_descriptors": float64(1.048576e+06), + "cpu_percent": float64(3.0), + "cpu_load_average_5m": float64(0.61), + "cpu_load_average_15m": float64(0.54), + "mem_total_virtual_in_bytes": float64(4.809506816e+09), + "cpu_total_in_millis": float64(1.5526e+11), + "cpu_load_average_1m": float64(0.49), + "peak_open_file_descriptors": float64(100.0), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + }, + ) +} + +func Test_Logstash6GatherProcessStats(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash6ProcessJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherProcessStats(logstashTest.URL+processStats, &logstash6accProcessStats); err != nil { + test.Logf("Can't gather Process stats") + } + + logstash6accProcessStats.AssertContainsTaggedFields( + test, + "logstash_process", + map[string]interface{}{ + "open_file_descriptors": float64(133.0), + "max_file_descriptors": float64(262144.0), + "cpu_percent": float64(0.0), + "cpu_load_average_5m": float64(42.4), + "cpu_load_average_15m": float64(38.95), + "mem_total_virtual_in_bytes": float64(17923452928.0), + "cpu_total_in_millis": float64(5841460), + "cpu_load_average_1m": float64(48.2), + "peak_open_file_descriptors": float64(145.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + }, + ) +} + +func Test_Logstash5GatherPipelineStats(test *testing.T) { + //logstash5accPipelineStats.SetDebug(true) + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash5PipelineJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherPipelineStats(logstashTest.URL+pipelineStats, &logstash5accPipelineStats); err != nil { + test.Logf("Can't gather Pipeline stats") + } + + logstash5accPipelineStats.AssertContainsTaggedFields( + test, + "logstash_events", + map[string]interface{}{ + "duration_in_millis": float64(1151.0), + "in": float64(1269.0), + "filtered": float64(1269.0), + "out": float64(1269.0), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + }, + ) + + logstash5accPipelineStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "queue_push_duration_in_millis": float64(32.0), + "out": float64(2.0), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + "plugin_name": string("beats"), + "plugin_id": string("a35197a509596954e905e38521bae12b1498b17d-1"), + "plugin_type": string("input"), + }, + ) + + logstash5accPipelineStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(360.0), + "in": float64(1269.0), + "out": float64(1269.0), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + "plugin_name": string("stdout"), + "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2"), + "plugin_type": string("output"), + }, + ) + + logstash5accPipelineStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(228.0), + "in": float64(1269.0), + "out": float64(1269.0), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + "plugin_name": string("s3"), + "plugin_id": string("582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3"), + "plugin_type": string("output"), + }, + ) +} + +func Test_Logstash6GatherPipelinesStats(test *testing.T) { + //logstash6accPipelinesStats.SetDebug(true) + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash6PipelinesJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherPipelinesStats(logstashTest.URL+pipelineStats, &logstash6accPipelinesStats); err != nil { + test.Logf("Can't gather Pipeline stats") + } + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_events", + map[string]interface{}{ + "duration_in_millis": float64(8540751.0), + "queue_push_duration_in_millis": float64(366.0), + "in": float64(180659.0), + "filtered": float64(180659.0), + "out": float64(180659.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "queue_push_duration_in_millis": float64(366.0), + "out": float64(180659.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("kafka"), + "plugin_id": string("input-kafka"), + "plugin_type": string("input"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(2117.0), + "in": float64(27641.0), + "out": float64(27641.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(2117.0), + "in": float64(27641.0), + "out": float64(27641.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(13149.0), + "in": float64(180659.0), + "out": float64(177549.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("date"), + "plugin_id": string("d079424bb6b7b8c7c61d9c5e0ddae445e92fa9ffa2e8690b0a669f7c690542f0"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(2814.0), + "in": float64(76602.0), + "out": float64(76602.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("25afa60ab6dc30512fe80efa3493e4928b5b1b109765b7dc46a3e4bbf293d2d4"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(9.0), + "in": float64(934.0), + "out": float64(934.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("2d9fa8f74eeb137bfa703b8050bad7d76636fface729e4585b789b5fc9bed668"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(173.0), + "in": float64(3110.0), + "out": float64(0.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("drop"), + "plugin_id": string("4ed14c9ef0198afe16c31200041e98d321cb5c2e6027e30b077636b8c4842110"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(5605.0), + "in": float64(75482.0), + "out": float64(75482.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("358ce1eb387de7cd5711c2fb4de64cd3b12e5ca9a4c45f529516bcb053a31df4"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(313992.0), + "in": float64(180659.0), + "out": float64(180659.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("csv"), + "plugin_id": string("82a9bbb02fff37a63c257c1f146b0a36273c7cbbebe83c0a51f086e5280bf7bb"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(0.0), + "in": float64(0.0), + "out": float64(0.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("mutate"), + "plugin_id": string("8fb13a8cdd4257b52724d326aa1549603ffdd4e4fde6d20720c96b16238c18c3"), + "plugin_type": string("filter"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(651386.0), + "in": float64(177549.0), + "out": float64(177549.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("elasticsearch"), + "plugin_id": string("output-elk"), + "plugin_type": string("output"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(186751.0), + "in": float64(177549.0), + "out": float64(177549.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("kafka"), + "plugin_id": string("output-kafka1"), + "plugin_type": string("output"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_plugins", + map[string]interface{}{ + "duration_in_millis": float64(7335196.0), + "in": float64(177549.0), + "out": float64(177549.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "plugin_name": string("kafka"), + "plugin_id": string("output-kafka2"), + "plugin_type": string("output"), + }, + ) + + logstash6accPipelinesStats.AssertContainsTaggedFields( + test, + "logstash_queue", + map[string]interface{}{ + "events": float64(103), + "free_space_in_bytes": float64(36307369984), + "max_queue_size_in_bytes": float64(1073741824), + "max_unread_events": float64(0), + "page_capacity_in_bytes": float64(67108864), + "queue_size_in_bytes": float64(1872391), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + "pipeline": string("main"), + "queue_type": string("persisted"), + }, + ) + +} + +func Test_Logstash5GatherJVMStats(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash5JvmJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherJVMStats(logstashTest.URL+jvmStats, &logstash5accJVMStats); err != nil { + test.Logf("Can't gather JVM stats") + } + + logstash5accJVMStats.AssertContainsTaggedFields( + test, + "logstash_jvm", + map[string]interface{}{ + "mem_pools_young_max_in_bytes": float64(5.5836672e+08), + "mem_pools_young_committed_in_bytes": float64(1.43261696e+08), + "mem_heap_committed_in_bytes": float64(5.1904512e+08), + "threads_count": float64(29.0), + "mem_pools_old_peak_used_in_bytes": float64(1.27900864e+08), + "mem_pools_old_peak_max_in_bytes": float64(7.2482816e+08), + "mem_heap_used_percent": float64(16.0), + "gc_collectors_young_collection_time_in_millis": float64(3235.0), + "mem_pools_survivor_committed_in_bytes": float64(1.7825792e+07), + "mem_pools_young_used_in_bytes": float64(7.6049384e+07), + "mem_non_heap_committed_in_bytes": float64(2.91487744e+08), + "mem_pools_survivor_peak_max_in_bytes": float64(3.4865152e+07), + "mem_pools_young_peak_max_in_bytes": float64(2.7918336e+08), + "uptime_in_millis": float64(4.803461e+06), + "mem_pools_survivor_peak_used_in_bytes": float64(8.912896e+06), + "mem_pools_survivor_max_in_bytes": float64(6.9730304e+07), + "gc_collectors_old_collection_count": float64(2.0), + "mem_pools_survivor_used_in_bytes": float64(9.419672e+06), + "mem_pools_old_used_in_bytes": float64(2.55801728e+08), + "mem_pools_old_max_in_bytes": float64(1.44965632e+09), + "mem_pools_young_peak_used_in_bytes": float64(7.1630848e+07), + "mem_heap_used_in_bytes": float64(3.41270784e+08), + "mem_heap_max_in_bytes": float64(2.077753344e+09), + "gc_collectors_young_collection_count": float64(616.0), + "threads_peak_count": float64(31.0), + "mem_pools_old_committed_in_bytes": float64(3.57957632e+08), + "gc_collectors_old_collection_time_in_millis": float64(114.0), + "mem_non_heap_used_in_bytes": float64(2.68905936e+08), + }, + map[string]string{ + "node_id": string("a360d8cf-6289-429d-8419-6145e324b574"), + "node_name": string("node-5-test"), + "node_host": string("node-5"), + "node_version": string("5.3.0"), + }, + ) + +} + +func Test_Logstash6GatherJVMStats(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash6JvmJSON)) + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + if err := logstashTest.gatherJVMStats(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil { + test.Logf("Can't gather JVM stats") + } + + logstash6accJVMStats.AssertContainsTaggedFields( + test, + "logstash_jvm", + map[string]interface{}{ + "mem_pools_young_max_in_bytes": float64(1605304320.0), + "mem_pools_young_committed_in_bytes": float64(71630848.0), + "mem_heap_committed_in_bytes": float64(824963072.0), + "threads_count": float64(60.0), + "mem_pools_old_peak_used_in_bytes": float64(696572600.0), + "mem_pools_old_peak_max_in_bytes": float64(6583418880.0), + "mem_heap_used_percent": float64(2.0), + "gc_collectors_young_collection_time_in_millis": float64(107321.0), + "mem_pools_survivor_committed_in_bytes": float64(8912896.0), + "mem_pools_young_used_in_bytes": float64(11775120.0), + "mem_non_heap_committed_in_bytes": float64(222986240.0), + "mem_pools_survivor_peak_max_in_bytes": float64(200605696), + "mem_pools_young_peak_max_in_bytes": float64(1605304320.0), + "uptime_in_millis": float64(281850926.0), + "mem_pools_survivor_peak_used_in_bytes": float64(8912896.0), + "mem_pools_survivor_max_in_bytes": float64(200605696.0), + "gc_collectors_old_collection_count": float64(37.0), + "mem_pools_survivor_used_in_bytes": float64(835008.0), + "mem_pools_old_used_in_bytes": float64(189750576.0), + "mem_pools_old_max_in_bytes": float64(6583418880.0), + "mem_pools_young_peak_used_in_bytes": float64(71630848.0), + "mem_heap_used_in_bytes": float64(202360704.0), + "mem_heap_max_in_bytes": float64(8389328896.0), + "gc_collectors_young_collection_count": float64(2094.0), + "threads_peak_count": float64(62.0), + "mem_pools_old_committed_in_bytes": float64(744419328.0), + "gc_collectors_old_collection_time_in_millis": float64(7492.0), + "mem_non_heap_used_in_bytes": float64(197878896.0), + }, + map[string]string{ + "node_id": string("3044f675-21ce-4335-898a-8408aa678245"), + "node_name": string("node-6-test"), + "node_host": string("node-6"), + "node_version": string("6.4.2"), + }, + ) + +} + +func Test_LogstashRequests(test *testing.T) { + fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.Header().Set("Content-Type", "application/json") + fmt.Fprintf(writer, "%s", string(logstash6JvmJSON)) + assert.Equal(test, request.Host, "logstash.test.local") + assert.Equal(test, request.Method, "POST") + assert.Equal(test, request.Header.Get("X-Test"), "test-header") + })) + requestURL, err := url.Parse(logstashTest.URL) + if err != nil { + test.Logf("Can't connect to: %s", logstashTest.URL) + } + fakeServer.Listener, _ = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port())) + fakeServer.Start() + defer fakeServer.Close() + + if logstashTest.client == nil { + client, err := logstashTest.createHttpClient() + + if err != nil { + test.Logf("Can't createHttpClient") + } + logstashTest.client = client + } + + logstashTest.Method = "POST" + logstashTest.Headers["X-Test"] = "test-header" + logstashTest.HostHeader = "logstash.test.local" + + if err := logstashTest.gatherJsonData(logstashTest.URL+jvmStats, &logstash6accJVMStats); err != nil { + test.Logf("Can't gather JVM stats") + } +} diff --git a/plugins/inputs/logstash/samples_logstash5.go b/plugins/inputs/logstash/samples_logstash5.go new file mode 100644 index 0000000000000..598f6dab5e9df --- /dev/null +++ b/plugins/inputs/logstash/samples_logstash5.go @@ -0,0 +1,156 @@ +package logstash + +const logstash5ProcessJSON = ` +{ + "host" : "node-5", + "version" : "5.3.0", + "http_address" : "0.0.0.0:9600", + "id" : "a360d8cf-6289-429d-8419-6145e324b574", + "name" : "node-5-test", + "process" : { + "open_file_descriptors" : 89, + "peak_open_file_descriptors" : 100, + "max_file_descriptors" : 1048576, + "mem" : { + "total_virtual_in_bytes" : 4809506816 + }, + "cpu" : { + "total_in_millis" : 155260000000, + "percent" : 3, + "load_average" : { + "1m" : 0.49, + "5m" : 0.61, + "15m" : 0.54 + } + } + } +} +` + +const logstash5JvmJSON = ` +{ + "host" : "node-5", + "version" : "5.3.0", + "http_address" : "0.0.0.0:9600", + "id" : "a360d8cf-6289-429d-8419-6145e324b574", + "name" : "node-5-test", + "jvm" : { + "threads" : { + "count" : 29, + "peak_count" : 31 + }, + "mem" : { + "heap_used_in_bytes" : 341270784, + "heap_used_percent" : 16, + "heap_committed_in_bytes" : 519045120, + "heap_max_in_bytes" : 2077753344, + "non_heap_used_in_bytes" : 268905936, + "non_heap_committed_in_bytes" : 291487744, + "pools" : { + "survivor" : { + "peak_used_in_bytes" : 8912896, + "used_in_bytes" : 9419672, + "peak_max_in_bytes" : 34865152, + "max_in_bytes" : 69730304, + "committed_in_bytes" : 17825792 + }, + "old" : { + "peak_used_in_bytes" : 127900864, + "used_in_bytes" : 255801728, + "peak_max_in_bytes" : 724828160, + "max_in_bytes" : 1449656320, + "committed_in_bytes" : 357957632 + }, + "young" : { + "peak_used_in_bytes" : 71630848, + "used_in_bytes" : 76049384, + "peak_max_in_bytes" : 279183360, + "max_in_bytes" : 558366720, + "committed_in_bytes" : 143261696 + } + } + }, + "gc" : { + "collectors" : { + "old" : { + "collection_time_in_millis" : 114, + "collection_count" : 2 + }, + "young" : { + "collection_time_in_millis" : 3235, + "collection_count" : 616 + } + } + }, + "uptime_in_millis" : 4803461 + } +} +` + +const logstash5PipelineJSON = ` +{ + "host" : "node-5", + "version" : "5.3.0", + "http_address" : "0.0.0.0:9600", + "id" : "a360d8cf-6289-429d-8419-6145e324b574", + "name" : "node-5-test", + "pipeline" : { + "events" : { + "duration_in_millis" : 1151, + "in" : 1269, + "filtered" : 1269, + "out" : 1269 + }, + "plugins" : { + "inputs" : [ { + "id" : "a35197a509596954e905e38521bae12b1498b17d-1", + "events" : { + "out" : 2, + "queue_push_duration_in_millis" : 32 + }, + "name" : "beats" + } ], + "filters" : [ ], + "outputs" : [ { + "id" : "582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-3", + "events" : { + "duration_in_millis" : 228, + "in" : 1269, + "out" : 1269 + }, + "name" : "s3" + }, { + "id" : "582d5c2becb582a053e1e9a6bcc11d49b69a6dfd-2", + "events" : { + "duration_in_millis" : 360, + "in" : 1269, + "out" : 1269 + }, + "name" : "stdout" + } ] + }, + "reloads" : { + "last_error" : null, + "successes" : 0, + "last_success_timestamp" : null, + "last_failure_timestamp" : null, + "failures" : 0 + }, + "queue" : { + "events" : 208, + "type" : "persisted", + "capacity" : { + "page_capacity_in_bytes" : 262144000, + "max_queue_size_in_bytes" : 8589934592, + "max_unread_events" : 0 + }, + "data" : { + "path" : "/path/to/data/queue", + "free_space_in_bytes" : 89280552960, + "storage_type" : "hfs" + } + }, + "id" : "main" + } +} +` diff --git a/plugins/inputs/logstash/samples_logstash6.go b/plugins/inputs/logstash/samples_logstash6.go new file mode 100644 index 0000000000000..16df2b0fdd33e --- /dev/null +++ b/plugins/inputs/logstash/samples_logstash6.go @@ -0,0 +1,256 @@ +package logstash + +const logstash6ProcessJSON = ` +{ + "host" : "node-6", + "version" : "6.4.2", + "http_address" : "127.0.0.1:9600", + "id" : "3044f675-21ce-4335-898a-8408aa678245", + "name" : "node-6-test", + "process" : { + "open_file_descriptors" : 133, + "peak_open_file_descriptors" : 145, + "max_file_descriptors" : 262144, + "mem" : { + "total_virtual_in_bytes" : 17923452928 + }, + "cpu" : { + "total_in_millis" : 5841460, + "percent" : 0, + "load_average" : { + "1m" : 48.2, + "5m" : 42.4, + "15m" : 38.95 + } + } + } +} +` +const logstash6JvmJSON = ` +{ + "host" : "node-6", + "version" : "6.4.2", + "http_address" : "127.0.0.1:9600", + "id" : "3044f675-21ce-4335-898a-8408aa678245", + "name" : "node-6-test", + "jvm" : { + "threads" : { + "count" : 60, + "peak_count" : 62 + }, + "mem" : { + "heap_used_percent" : 2, + "heap_committed_in_bytes" : 824963072, + "heap_max_in_bytes" : 8389328896, + "heap_used_in_bytes" : 202360704, + "non_heap_used_in_bytes" : 197878896, + "non_heap_committed_in_bytes" : 222986240, + "pools" : { + "survivor" : { + "peak_used_in_bytes" : 8912896, + "used_in_bytes" : 835008, + "peak_max_in_bytes" : 200605696, + "max_in_bytes" : 200605696, + "committed_in_bytes" : 8912896 + }, + "old" : { + "peak_used_in_bytes" : 696572600, + "used_in_bytes" : 189750576, + "peak_max_in_bytes" : 6583418880, + "max_in_bytes" : 6583418880, + "committed_in_bytes" : 744419328 + }, + "young" : { + "peak_used_in_bytes" : 71630848, + "used_in_bytes" : 11775120, + "peak_max_in_bytes" : 1605304320, + "max_in_bytes" : 1605304320, + "committed_in_bytes" : 71630848 + } + } + }, + "gc" : { + "collectors" : { + "old" : { + "collection_time_in_millis" : 7492, + "collection_count" : 37 + }, + "young" : { + "collection_time_in_millis" : 107321, + "collection_count" : 2094 + } + } + }, + "uptime_in_millis" : 281850926 + } +} +` + +const logstash6PipelinesJSON = ` +{ + "host" : "node-6", + "version" : "6.4.2", + "http_address" : "127.0.0.1:9600", + "id" : "3044f675-21ce-4335-898a-8408aa678245", + "name" : "node-6-test", + "pipelines" : { + "main" : { + "events" : { + "duration_in_millis" : 8540751, + "in" : 180659, + "out" : 180659, + "filtered" : 180659, + "queue_push_duration_in_millis" : 366 + }, + "plugins" : { + "inputs" : [ + { + "id" : "input-kafka", + "events" : { + "out" : 180659, + "queue_push_duration_in_millis" : 366 + }, + "name" : "kafka" + } + ], + "filters" : [ + { + "id" : "155b0ad18abbf3df1e0cb7bddef0d77c5ba699efe5a0f8a28502d140549baf54", + "events" : { + "duration_in_millis" : 2117, + "in" : 27641, + "out" : 27641 + }, + "name" : "mutate" + }, + { + "id" : "d079424bb6b7b8c7c61d9c5e0ddae445e92fa9ffa2e8690b0a669f7c690542f0", + "events" : { + "duration_in_millis" : 13149, + "in" : 180659, + "out" : 177549 + }, + "matches" : 177546, + "failures" : 2, + "name" : "date" + }, + { + "id" : "25afa60ab6dc30512fe80efa3493e4928b5b1b109765b7dc46a3e4bbf293d2d4", + "events" : { + "duration_in_millis" : 2814, + "in" : 76602, + "out" : 76602 + }, + "name" : "mutate" + }, + { + "id" : "2d9fa8f74eeb137bfa703b8050bad7d76636fface729e4585b789b5fc9bed668", + "events" : { + "duration_in_millis" : 9, + "in" : 934, + "out" : 934 + }, + "name" : "mutate" + }, + { + "id" : "4ed14c9ef0198afe16c31200041e98d321cb5c2e6027e30b077636b8c4842110", + "events" : { + "duration_in_millis" : 173, + "in" : 3110, + "out" : 0 + }, + "name" : "drop" + }, + { + "id" : "358ce1eb387de7cd5711c2fb4de64cd3b12e5ca9a4c45f529516bcb053a31df4", + "events" : { + "duration_in_millis" : 5605, + "in" : 75482, + "out" : 75482 + }, + "name" : "mutate" + }, + { + "id" : "82a9bbb02fff37a63c257c1f146b0a36273c7cbbebe83c0a51f086e5280bf7bb", + "events" : { + "duration_in_millis" : 313992, + "in" : 180659, + "out" : 180659 + }, + "name" : "csv" + }, + { + "id" : "8fb13a8cdd4257b52724d326aa1549603ffdd4e4fde6d20720c96b16238c18c3", + "events" : { + "duration_in_millis" : 0, + "in" : 0, + "out" : 0 + }, + "name" : "mutate" + } + ], + "outputs" : [ + { + "id" : "output-elk", + "documents" : { + "successes" : 221 + }, + "events" : { + "duration_in_millis" : 651386, + "in" : 177549, + "out" : 177549 + }, + "bulk_requests" : { + "successes" : 1, + "responses" : { + "200" : 748 + } + }, + "name" : "elasticsearch" + }, + { + "id" : "output-kafka1", + "events" : { + "duration_in_millis" : 186751, + "in" : 177549, + "out" : 177549 + }, + "name" : "kafka" + }, + { + "id" : "output-kafka2", + "events" : { + "duration_in_millis" : 7335196, + "in" : 177549, + "out" : 177549 + }, + "name" : "kafka" + } + ] + }, + "reloads" : { + "last_error" : null, + "successes" : 0, + "last_success_timestamp" : null, + "last_failure_timestamp" : null, + "failures" : 0 + }, + "queue": { + "events": 103, + "type": "persisted", + "capacity": { + "queue_size_in_bytes": 1872391, + "page_capacity_in_bytes": 67108864, + "max_queue_size_in_bytes": 1073741824, + "max_unread_events": 0 + }, + "data": { + "path": "/var/lib/logstash/queue/main", + "free_space_in_bytes": 36307369984, + "storage_type": "ext4" + } + } + } + } +} +`