Skip to content

Commit

Permalink
change jolokia input to use bulk requests (influxdata#2253)
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer authored and Vladislav Mugultyanov (Lazada Group) committed May 30, 2017
1 parent ff511a1 commit 756b7e4
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 126 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ be deprecated eventually.
- [#1820](https://github.com/influxdata/telegraf/issues/1820): easier plugin testing without outputs
- [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin
- [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks
- [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests.

### Bugfixes

Expand Down
149 changes: 77 additions & 72 deletions plugins/inputs/jolokia/jolokia.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jolokia
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -130,7 +129,7 @@ func (j *Jolokia) Description() string {
return "Read JMX metrics through Jolokia"
}

func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) {
func (j *Jolokia) doRequest(req *http.Request) ([]map[string]interface{}, error) {
resp, err := j.jClient.MakeRequest(req)
if err != nil {
return nil, err
Expand All @@ -155,85 +154,81 @@ func (j *Jolokia) doRequest(req *http.Request) (map[string]interface{}, error) {
}

// Unmarshal json
var jsonOut map[string]interface{}
var jsonOut []map[string]interface{}
if err = json.Unmarshal([]byte(body), &jsonOut); err != nil {
return nil, errors.New("Error decoding JSON response")
}

if status, ok := jsonOut["status"]; ok {
if status != float64(200) {
return nil, fmt.Errorf("Not expected status value in response body: %3.f",
status)
}
} else {
return nil, fmt.Errorf("Missing status in response body")
return nil, fmt.Errorf("Error decoding JSON response: %s: %s", err, body)
}

return jsonOut, nil
}

func (j *Jolokia) prepareRequest(server Server, metric Metric) (*http.Request, error) {
func (j *Jolokia) prepareRequest(server Server, metrics []Metric) (*http.Request, error) {
var jolokiaUrl *url.URL
context := j.Context // Usually "/jolokia/"

// Create bodyContent
bodyContent := map[string]interface{}{
"type": "read",
"mbean": metric.Mbean,
}
var bulkBodyContent []map[string]interface{}
for _, metric := range metrics {
// Create bodyContent
bodyContent := map[string]interface{}{
"type": "read",
"mbean": metric.Mbean,
}

if metric.Attribute != "" {
bodyContent["attribute"] = metric.Attribute
if metric.Path != "" {
bodyContent["path"] = metric.Path
if metric.Attribute != "" {
bodyContent["attribute"] = metric.Attribute
if metric.Path != "" {
bodyContent["path"] = metric.Path
}
}
}

// Add target, only in proxy mode
if j.Mode == "proxy" {
serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi",
server.Host, server.Port)
// Add target, only in proxy mode
if j.Mode == "proxy" {
serviceUrl := fmt.Sprintf("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi",
server.Host, server.Port)

target := map[string]string{
"url": serviceUrl,
}
target := map[string]string{
"url": serviceUrl,
}

if server.Username != "" {
target["user"] = server.Username
}
if server.Username != "" {
target["user"] = server.Username
}

if server.Password != "" {
target["password"] = server.Password
}
if server.Password != "" {
target["password"] = server.Password
}

bodyContent["target"] = target
bodyContent["target"] = target

proxy := j.Proxy
proxy := j.Proxy

// Prepare ProxyURL
proxyUrl, err := url.Parse("http://" + proxy.Host + ":" + proxy.Port + context)
if err != nil {
return nil, err
}
if proxy.Username != "" || proxy.Password != "" {
proxyUrl.User = url.UserPassword(proxy.Username, proxy.Password)
}
// Prepare ProxyURL
proxyUrl, err := url.Parse("http://" + proxy.Host + ":" + proxy.Port + context)
if err != nil {
return nil, err
}
if proxy.Username != "" || proxy.Password != "" {
proxyUrl.User = url.UserPassword(proxy.Username, proxy.Password)
}

jolokiaUrl = proxyUrl
jolokiaUrl = proxyUrl

} else {
serverUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context)
if err != nil {
return nil, err
}
if server.Username != "" || server.Password != "" {
serverUrl.User = url.UserPassword(server.Username, server.Password)
} else {
serverUrl, err := url.Parse("http://" + server.Host + ":" + server.Port + context)
if err != nil {
return nil, err
}
if server.Username != "" || server.Password != "" {
serverUrl.User = url.UserPassword(server.Username, server.Password)
}

jolokiaUrl = serverUrl
}

jolokiaUrl = serverUrl
bulkBodyContent = append(bulkBodyContent, bodyContent)
}

requestBody, err := json.Marshal(bodyContent)
requestBody, err := json.Marshal(bulkBodyContent)

req, err := http.NewRequest("POST", jolokiaUrl.String(), bytes.NewBuffer(requestBody))

Expand Down Expand Up @@ -276,25 +271,35 @@ func (j *Jolokia) Gather(acc telegraf.Accumulator) error {
tags["jolokia_host"] = server.Host
fields := make(map[string]interface{})

for _, metric := range metrics {
measurement := metric.Name
req, err := j.prepareRequest(server, metrics)
if err != nil {
acc.AddError(fmt.Errorf("unable to create request: %s", err))
continue
}
out, err := j.doRequest(req)
if err != nil {
acc.AddError(fmt.Errorf("error performing request: %s", err))
continue
}

req, err := j.prepareRequest(server, metric)
if err != nil {
return err
if len(out) != len(metrics) {
acc.AddError(fmt.Errorf("did not receive the correct number of metrics in response. expected %d, received %d", len(metrics), len(out)))
continue
}
for i, resp := range out {
if status, ok := resp["status"]; ok && status != float64(200) {
acc.AddError(fmt.Errorf("Not expected status value in response body (%s:%s mbean=\"%s\" attribute=\"%s\"): %3.f",
server.Host, server.Port, metrics[i].Mbean, metrics[i].Attribute, status))
continue
} else if !ok {
acc.AddError(fmt.Errorf("Missing status in response body"))
continue
}

out, err := j.doRequest(req)

if err != nil {
fmt.Printf("Error handling response: %s\n", err)
if values, ok := resp["value"]; ok {
j.extractValues(metrics[i].Name, values, fields)
} else {
if values, ok := out["value"]; ok {
j.extractValues(measurement, values, fields)
} else {
fmt.Printf("Missing key 'value' in output response\n")
}

acc.AddError(fmt.Errorf("Missing key 'value' in output response\n"))
}
}

Expand Down
Loading

0 comments on commit 756b7e4

Please sign in to comment.