Skip to content

Commit

Permalink
Fix RabbitMQ regression in #9383 (#9443)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jun 29, 2021
1 parent eba6191 commit e2ab218
Show file tree
Hide file tree
Showing 15 changed files with 1,675 additions and 207 deletions.
6 changes: 6 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]

## Metrics to include and exclude. Globs accepted.
## Note that an empty array for both will include all metrics
## Currently the following metrics are supported: "exchange", "federation", "node", "overview", "queue"
# metric_include = []
# metric_exclude = []

## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
# queue_name_include = []
Expand Down
198 changes: 137 additions & 61 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmq
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -47,15 +48,18 @@ type RabbitMQ struct {
Queues []string `toml:"queues"`
Exchanges []string `toml:"exchanges"`

MetricInclude []string `toml:"metric_include"`
MetricExclude []string `toml:"metric_exclude"`
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
FederationUpstreamInclude []string `toml:"federation_upstream_include"`
FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`

Client *http.Client `toml:"-"`
Log telegraf.Logger `toml:"-"`

filterCreated bool
client *http.Client
excludeEveryQueue bool
metricFilter filter.Filter
queueFilter filter.Filter
upstreamFilter filter.Filter
}
Expand Down Expand Up @@ -163,11 +167,11 @@ type Node struct {
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTime float64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTime float64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
Expand Down Expand Up @@ -226,32 +230,44 @@ type MemoryResponse struct {

// Memory details
type Memory struct {
ConnectionReaders int64 `json:"connection_readers"`
ConnectionWriters int64 `json:"connection_writers"`
ConnectionChannels int64 `json:"connection_channels"`
ConnectionOther int64 `json:"connection_other"`
QueueProcs int64 `json:"queue_procs"`
QueueSlaveProcs int64 `json:"queue_slave_procs"`
Plugins int64 `json:"plugins"`
OtherProc int64 `json:"other_proc"`
Metrics int64 `json:"metrics"`
MgmtDb int64 `json:"mgmt_db"`
Mnesia int64 `json:"mnesia"`
OtherEts int64 `json:"other_ets"`
Binary int64 `json:"binary"`
MsgIndex int64 `json:"msg_index"`
Code int64 `json:"code"`
Atom int64 `json:"atom"`
OtherSystem int64 `json:"other_system"`
AllocatedUnused int64 `json:"allocated_unused"`
ReservedUnallocated int64 `json:"reserved_unallocated"`
Total int64 `json:"total"`
ConnectionReaders int64 `json:"connection_readers"`
ConnectionWriters int64 `json:"connection_writers"`
ConnectionChannels int64 `json:"connection_channels"`
ConnectionOther int64 `json:"connection_other"`
QueueProcs int64 `json:"queue_procs"`
QueueSlaveProcs int64 `json:"queue_slave_procs"`
Plugins int64 `json:"plugins"`
OtherProc int64 `json:"other_proc"`
Metrics int64 `json:"metrics"`
MgmtDb int64 `json:"mgmt_db"`
Mnesia int64 `json:"mnesia"`
OtherEts int64 `json:"other_ets"`
Binary int64 `json:"binary"`
MsgIndex int64 `json:"msg_index"`
Code int64 `json:"code"`
Atom int64 `json:"atom"`
OtherSystem int64 `json:"other_system"`
AllocatedUnused int64 `json:"allocated_unused"`
ReservedUnallocated int64 `json:"reserved_unallocated"`
Total interface{} `json:"total"`
}

// Error response
type ErrorResponse struct {
Error string `json:"error"`
Reason string `json:"reason"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges, gatherFederationLinks}
var gatherFunctions = map[string]gatherFunc{
"exchange": gatherExchanges,
"federation": gatherFederationLinks,
"node": gatherNodes,
"overview": gatherOverview,
"queue": gatherQueues,
}

var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
Expand Down Expand Up @@ -291,6 +307,12 @@ var sampleConfig = `
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
## Metrics to include and exclude. Globs accepted.
## Note that an empty array for both will include all metrics
## Currently the following metrics are supported: "exchange", "federation", "node", "overview", "queue"
# metric_include = []
# metric_exclude = []
## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
queue_name_include = []
Expand Down Expand Up @@ -323,39 +345,47 @@ func (r *RabbitMQ) Description() string {
return "Reads metrics from RabbitMQ servers via the Management Plugin"
}

// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
if r.Client == nil {
tlsCfg, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(r.ResponseHeaderTimeout),
TLSClientConfig: tlsCfg,
}
r.Client = &http.Client{
Transport: tr,
Timeout: time.Duration(r.ClientTimeout),
}
func (r *RabbitMQ) Init() error {
var err error

// Create gather filters
if err := r.createQueueFilter(); err != nil {
return err
}
if err := r.createUpstreamFilter(); err != nil {
return err
}

// Create gather filters if not already created
if !r.filterCreated {
err := r.createQueueFilter()
if err != nil {
return err
}
err = r.createUpstreamFilter()
if err != nil {
return err
}
r.filterCreated = true
// Create a filter for the metrics
if r.metricFilter, err = filter.NewIncludeExcludeFilter(r.MetricInclude, r.MetricExclude); err != nil {
return err
}

tlsCfg, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(r.ResponseHeaderTimeout),
TLSClientConfig: tlsCfg,
}
r.client = &http.Client{
Transport: tr,
Timeout: time.Duration(r.ClientTimeout),
}

return nil
}

// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
for _, f := range gatherFunctions {
for name, f := range gatherFunctions {
// Query only metrics that are supported
if !r.metricFilter.Match(name) {
continue
}
wg.Add(1)
go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc)
Expand All @@ -366,15 +396,16 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
return nil
}

func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
func (r *RabbitMQ) requestEndpoint(u string) ([]byte, error) {
if r.URL == "" {
r.URL = DefaultURL
}
u = fmt.Sprintf("%s%s", r.URL, u)
endpoint := r.URL + u
r.Log.Debugf("Requesting %q...", endpoint)

req, err := http.NewRequest("GET", u, nil)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return err
return nil, err
}

username := r.Username
Expand All @@ -389,14 +420,39 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error {

req.SetBasicAuth(username, password)

resp, err := r.Client.Do(req)
resp, err := r.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

r.Log.Debugf("HTTP status code: %v %v", resp.StatusCode, http.StatusText(resp.StatusCode))
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, fmt.Errorf("getting %q failed: %v %v", u, resp.StatusCode, http.StatusText(resp.StatusCode))
}

return ioutil.ReadAll(resp.Body)
}

func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
buf, err := r.requestEndpoint(u)
if err != nil {
return err
}
if err := json.Unmarshal(buf, target); err != nil {
if _, ok := err.(*json.UnmarshalTypeError); ok {
// Try to get the error reason from the response
var errResponse ErrorResponse
if json.Unmarshal(buf, &errResponse) == nil && errResponse.Error != "" {
// Return the error reason in the response
return fmt.Errorf("error response trying to get %q: %q (reason: %q)", u, errResponse.Error, errResponse.Reason)
}
}

defer resp.Body.Close()
return fmt.Errorf("decoding answer from %q failed: %v", u, err)
}

return json.NewDecoder(resp.Body).Decode(target)
return nil
}

func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
Expand Down Expand Up @@ -533,7 +589,27 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
fields["mem_other_system"] = memory.Memory.OtherSystem
fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated
fields["mem_total"] = memory.Memory.Total
switch v := memory.Memory.Total.(type) {
case float64:
fields["mem_total"] = int64(v)
case map[string]interface{}:
var foundEstimator bool
for _, estimator := range []string{"rss", "allocated", "erlang"} {
if x, found := v[estimator]; found {
if total, ok := x.(float64); ok {
fields["mem_total"] = int64(total)
foundEstimator = true
break
}
acc.AddError(fmt.Errorf("unknown type %T for %q total memory", x, estimator))
}
}
if !foundEstimator {
acc.AddError(fmt.Errorf("no known memory estimation in %v", v))
}
default:
acc.AddError(fmt.Errorf("unknown type %T for total memory", memory.Memory.Total))
}
}

acc.AddFields("rabbitmq_node", fields, tags)
Expand Down
Loading

0 comments on commit e2ab218

Please sign in to comment.