Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RabbitMQ regression in #9383 #9443

Merged
merged 19 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ For additional details reference the [RabbitMQ Management HTTP Stats][management
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]

## Metrics to include and exclude. Globs accepted.
## Note that an empty array for both will include all metrics
## Currently the following metrics are supported: "exchange", "federation", "node", "overview", "queue"
# metric_include = []
# metric_exclude = []

## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
# queue_name_include = []
Expand Down
198 changes: 137 additions & 61 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmq
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -47,15 +48,18 @@ type RabbitMQ struct {
Queues []string `toml:"queues"`
Exchanges []string `toml:"exchanges"`

MetricInclude []string `toml:"metric_include"`
MetricExclude []string `toml:"metric_exclude"`
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
FederationUpstreamInclude []string `toml:"federation_upstream_include"`
FederationUpstreamExclude []string `toml:"federation_upstream_exclude"`

Client *http.Client `toml:"-"`
Log telegraf.Logger `toml:"-"`

filterCreated bool
client *http.Client
excludeEveryQueue bool
metricFilter filter.Filter
queueFilter filter.Filter
upstreamFilter filter.Filter
}
Expand Down Expand Up @@ -163,11 +167,11 @@ type Node struct {
GcNumDetails Details `json:"gc_num_details"`
GcBytesReclaimed int64 `json:"gc_bytes_reclaimed"`
GcBytesReclaimedDetails Details `json:"gc_bytes_reclaimed_details"`
IoReadAvgTime int64 `json:"io_read_avg_time"`
IoReadAvgTime float64 `json:"io_read_avg_time"`
IoReadAvgTimeDetails Details `json:"io_read_avg_time_details"`
IoReadBytes int64 `json:"io_read_bytes"`
IoReadBytesDetails Details `json:"io_read_bytes_details"`
IoWriteAvgTime int64 `json:"io_write_avg_time"`
IoWriteAvgTime float64 `json:"io_write_avg_time"`
IoWriteAvgTimeDetails Details `json:"io_write_avg_time_details"`
IoWriteBytes int64 `json:"io_write_bytes"`
IoWriteBytesDetails Details `json:"io_write_bytes_details"`
Expand Down Expand Up @@ -226,32 +230,44 @@ type MemoryResponse struct {

// Memory details
type Memory struct {
ConnectionReaders int64 `json:"connection_readers"`
ConnectionWriters int64 `json:"connection_writers"`
ConnectionChannels int64 `json:"connection_channels"`
ConnectionOther int64 `json:"connection_other"`
QueueProcs int64 `json:"queue_procs"`
QueueSlaveProcs int64 `json:"queue_slave_procs"`
Plugins int64 `json:"plugins"`
OtherProc int64 `json:"other_proc"`
Metrics int64 `json:"metrics"`
MgmtDb int64 `json:"mgmt_db"`
Mnesia int64 `json:"mnesia"`
OtherEts int64 `json:"other_ets"`
Binary int64 `json:"binary"`
MsgIndex int64 `json:"msg_index"`
Code int64 `json:"code"`
Atom int64 `json:"atom"`
OtherSystem int64 `json:"other_system"`
AllocatedUnused int64 `json:"allocated_unused"`
ReservedUnallocated int64 `json:"reserved_unallocated"`
Total int64 `json:"total"`
ConnectionReaders int64 `json:"connection_readers"`
ConnectionWriters int64 `json:"connection_writers"`
ConnectionChannels int64 `json:"connection_channels"`
ConnectionOther int64 `json:"connection_other"`
QueueProcs int64 `json:"queue_procs"`
QueueSlaveProcs int64 `json:"queue_slave_procs"`
Plugins int64 `json:"plugins"`
OtherProc int64 `json:"other_proc"`
Metrics int64 `json:"metrics"`
MgmtDb int64 `json:"mgmt_db"`
Mnesia int64 `json:"mnesia"`
OtherEts int64 `json:"other_ets"`
Binary int64 `json:"binary"`
MsgIndex int64 `json:"msg_index"`
Code int64 `json:"code"`
Atom int64 `json:"atom"`
OtherSystem int64 `json:"other_system"`
AllocatedUnused int64 `json:"allocated_unused"`
ReservedUnallocated int64 `json:"reserved_unallocated"`
Total interface{} `json:"total"`
}

// Error response
type ErrorResponse struct {
Error string `json:"error"`
Reason string `json:"reason"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges, gatherFederationLinks}
var gatherFunctions = map[string]gatherFunc{
"exchange": gatherExchanges,
"federation": gatherFederationLinks,
"node": gatherNodes,
"overview": gatherOverview,
"queue": gatherQueues,
}

var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
Expand Down Expand Up @@ -291,6 +307,12 @@ var sampleConfig = `
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]

## Metrics to include and exclude. Globs accepted.
## Note that an empty array for both will include all metrics
## Currently the following metrics are supported: "exchange", "federation", "node", "overview", "queue"
# metric_include = []
# metric_exclude = []

## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
queue_name_include = []
Expand Down Expand Up @@ -323,39 +345,47 @@ func (r *RabbitMQ) Description() string {
return "Reads metrics from RabbitMQ servers via the Management Plugin"
}

// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
if r.Client == nil {
tlsCfg, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(r.ResponseHeaderTimeout),
TLSClientConfig: tlsCfg,
}
r.Client = &http.Client{
Transport: tr,
Timeout: time.Duration(r.ClientTimeout),
}
func (r *RabbitMQ) Init() error {
var err error

// Create gather filters
if err := r.createQueueFilter(); err != nil {
return err
}
if err := r.createUpstreamFilter(); err != nil {
return err
}

// Create gather filters if not already created
if !r.filterCreated {
err := r.createQueueFilter()
if err != nil {
return err
}
err = r.createUpstreamFilter()
if err != nil {
return err
}
r.filterCreated = true
// Create a filter for the metrics
if r.metricFilter, err = filter.NewIncludeExcludeFilter(r.MetricInclude, r.MetricExclude); err != nil {
return err
}

tlsCfg, err := r.ClientConfig.TLSConfig()
if err != nil {
return err
}
tr := &http.Transport{
ResponseHeaderTimeout: time.Duration(r.ResponseHeaderTimeout),
TLSClientConfig: tlsCfg,
}
r.client = &http.Client{
Transport: tr,
Timeout: time.Duration(r.ClientTimeout),
}

return nil
}

// Gather ...
func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
for _, f := range gatherFunctions {
for name, f := range gatherFunctions {
// Query only metrics that are supported
if !r.metricFilter.Match(name) {
continue
}
wg.Add(1)
go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc)
Expand All @@ -366,15 +396,16 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
return nil
}

func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
func (r *RabbitMQ) requestEndpoint(u string) ([]byte, error) {
if r.URL == "" {
r.URL = DefaultURL
}
u = fmt.Sprintf("%s%s", r.URL, u)
endpoint := r.URL + u
r.Log.Debugf("Requesting %q...", endpoint)

req, err := http.NewRequest("GET", u, nil)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return err
return nil, err
}

username := r.Username
Expand All @@ -389,14 +420,39 @@ func (r *RabbitMQ) requestJSON(u string, target interface{}) error {

req.SetBasicAuth(username, password)

resp, err := r.Client.Do(req)
resp, err := r.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

r.Log.Debugf("HTTP status code: %v %v", resp.StatusCode, http.StatusText(resp.StatusCode))
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, fmt.Errorf("getting %q failed: %v %v", u, resp.StatusCode, http.StatusText(resp.StatusCode))
}

return ioutil.ReadAll(resp.Body)
}

func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
buf, err := r.requestEndpoint(u)
if err != nil {
return err
}
if err := json.Unmarshal(buf, target); err != nil {
if _, ok := err.(*json.UnmarshalTypeError); ok {
// Try to get the error reason from the response
var errResponse ErrorResponse
if json.Unmarshal(buf, &errResponse) == nil && errResponse.Error != "" {
// Return the error reason in the response
return fmt.Errorf("error response trying to get %q: %q (reason: %q)", u, errResponse.Error, errResponse.Reason)
}
}

defer resp.Body.Close()
return fmt.Errorf("decoding answer from %q failed: %v", u, err)
}

return json.NewDecoder(resp.Body).Decode(target)
return nil
}

func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
Expand Down Expand Up @@ -533,7 +589,27 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
fields["mem_other_system"] = memory.Memory.OtherSystem
fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated
fields["mem_total"] = memory.Memory.Total
switch v := memory.Memory.Total.(type) {
case float64:
fields["mem_total"] = int64(v)
case map[string]interface{}:
srebhan marked this conversation as resolved.
Show resolved Hide resolved
var foundEstimator bool
for _, estimator := range []string{"rss", "allocated", "erlang"} {
if x, found := v[estimator]; found {
if total, ok := x.(float64); ok {
fields["mem_total"] = int64(total)
foundEstimator = true
break
}
acc.AddError(fmt.Errorf("unknown type %T for %q total memory", x, estimator))
}
}
if !foundEstimator {
acc.AddError(fmt.Errorf("no known memory estimation in %v", v))
}
default:
acc.AddError(fmt.Errorf("unknown type %T for total memory", memory.Memory.Total))
}
}

acc.AddFields("rabbitmq_node", fields, tags)
Expand Down
Loading