From 4b7d11385c97240cf7dee30636bcd3950cd0ba25 Mon Sep 17 00:00:00 2001 From: Aladex Date: Thu, 7 Jan 2021 19:21:09 +0300 Subject: [PATCH] Using mime-type in prometheus parser to handle protocol-buffer responses (#8545) --- plugins/inputs/prometheus/prometheus.go | 2 +- plugins/parsers/prometheus/parser.go | 27 +++++- plugins/parsers/prometheus/parser_test.go | 105 ++++++++++++++++++++++ 3 files changed, 130 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index 5a7891ceb60ef..8ec316bb8aaf6 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -330,7 +330,7 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error } if p.MetricVersion == 2 { - parser := parser_v2.Parser{} + parser := parser_v2.Parser{Header: resp.Header} metrics, err = parser.Parse(body) } else { metrics, err = Parse(body, resp.Header) diff --git a/plugins/parsers/prometheus/parser.go b/plugins/parsers/prometheus/parser.go index c5355ffe07a8f..e512d1c9934d5 100644 --- a/plugins/parsers/prometheus/parser.go +++ b/plugins/parsers/prometheus/parser.go @@ -4,7 +4,11 @@ import ( "bufio" "bytes" "fmt" + "github.com/matttproud/golang_protobuf_extensions/pbutil" + "io" "math" + "mime" + "net/http" "time" "github.com/influxdata/telegraf" @@ -17,6 +21,7 @@ import ( type Parser struct { DefaultTags map[string]string + Header http.Header } func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { @@ -31,9 +36,25 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { // Prepare output metricFamilies := make(map[string]*dto.MetricFamily) - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %s", err) + mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type")) + if err == nil && mediatype == "application/vnd.google.protobuf" && + params["encoding"] == "delimited" && + params["proto"] == "io.prometheus.client.MetricFamily" { + for { + mf := &dto.MetricFamily{} + if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { + if ierr == io.EOF { + break + } + return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) + } + metricFamilies[mf.GetName()] = mf + } + } else { + metricFamilies, err = parser.TextToMetricFamilies(reader) + if err != nil { + return nil, fmt.Errorf("reading text format failed: %s", err) + } } now := time.Now() diff --git a/plugins/parsers/prometheus/parser_test.go b/plugins/parsers/prometheus/parser_test.go index 74530ef1b9233..8b8a4ad2ff7b0 100644 --- a/plugins/parsers/prometheus/parser_test.go +++ b/plugins/parsers/prometheus/parser_test.go @@ -2,6 +2,9 @@ package prometheus import ( "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" "testing" "time" @@ -344,3 +347,105 @@ func parse(buf []byte) ([]telegraf.Metric, error) { parser := Parser{} return parser.Parse(buf) } + +func TestParserProtobufHeader(t *testing.T) { + var uClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } + expected := []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_free": 9.77911808e+08, + }, + time.Unix(0, 0), + 2, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_in": 2.031616e+06, + }, + time.Unix(0, 0), + 1, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_out": 1.579008e+07, + }, + time.Unix(0, 0), + 1, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_total": 9.93185792e+08, + }, + time.Unix(0, 0), + 2, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_used": 1.5273984e+07, + }, + time.Unix(0, 0), + 2, + ), + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "omsk", + }, + map[string]interface{}{ + "swap_used_percent": 1.5378778193395661, + }, + time.Unix(0, 0), + 2, + ), + } + sampleProtoBufData := []uint8{67, 10, 9, 115, 119, 97, 112, 95, 102, 114, 101, 101, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 224, 36, 205, 65, 65, 10, 7, 115, 119, 97, 112, 95, 105, 110, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 0, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 26, 9, 9, 0, 0, 0, 0, 0, 0, 63, 65, 66, 10, 8, 115, 119, 97, 112, 95, 111, 117, 116, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 0, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 26, 9, 9, 0, 0, 0, 0, 0, 30, 110, 65, 68, 10, 10, 115, 119, 97, 112, 95, 116, 111, 116, 97, 108, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 104, 153, 205, 65, 67, 10, 9, 115, 119, 97, 112, 95, 117, 115, 101, 100, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 0, 0, 0, 0, 0, 34, 109, 65, 75, 10, 17, 115, 119, 97, 112, 95, 117, 115, 101, 100, 95, 112, 101, 114, 99, 101, 110, 116, 18, 25, 84, 101, 108, 101, 103, 114, 97, 102, 32, 99, 111, 108, 108, 101, 99, 116, 101, 100, 32, 109, 101, 116, 114, 105, 99, 24, 1, 34, 25, 10, 12, 10, 4, 104, 111, 115, 116, 18, 4, 111, 109, 115, 107, 18, 9, 9, 109, 234, 180, 197, 37, 155, 248, 63} + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; encoding=delimited") + w.Write(sampleProtoBufData) + })) + defer ts.Close() + req, err := http.NewRequest("GET", ts.URL, nil) + if err != nil { + t.Fatalf("unable to create new request '%s': %s", ts.URL, err) + } + var resp *http.Response + resp, err = uClient.Do(req) + if err != nil { + t.Fatalf("error making HTTP request to %s: %s", ts.URL, err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("error reading body: %s", err) + } + parser := Parser{Header: resp.Header} + metrics, err := parser.Parse(body) + if err != nil { + t.Fatalf("error reading metrics for %s: %s", ts.URL, err) + } + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +}