From af850b88540e38e7213bf5a1c294ad2f6035bbc4 Mon Sep 17 00:00:00 2001 From: Kishore Nallan Date: Tue, 13 Dec 2016 19:47:20 +0530 Subject: [PATCH] Rabbitmq plugin: connection-related metrics. (#1908) * Rabbitmq plugin: connection-related metrics. * Run go fmt. --- plugins/inputs/rabbitmq/rabbitmq.go | 69 ++++++++++++++++++++++-- plugins/inputs/rabbitmq/rabbitmq_test.go | 64 ++++++++++++++++++++++ 2 files changed, 130 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 5519ee14a9025..d1c973deaa59d 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -50,8 +50,9 @@ type RabbitMQ struct { ClientTimeout internal.Duration `toml:"client_timeout"` // InsecureSkipVerify bool - Nodes []string - Queues []string + Nodes []string + Queues []string + Connections []string Client *http.Client } @@ -135,10 +136,22 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } +// Connection ... +type Connection struct { + Name string + State string + Vhost string + Host string + Node string + ReceiveCount int64 `json:"recv_cnt"` + SendCount int64 `json:"send_cnt"` + SendPend int64 `json:"send_pend"` +} + // gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) -var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} +var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherConnections} var sampleConfig = ` # url = "http://localhost:15672" @@ -380,6 +393,42 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { errChan <- nil } +func gatherConnections(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) { + // Gather information about connections + connections := make([]Connection, 0) + err := r.requestJSON("/api/connections", &connections) + if err != nil { + errChan <- err + return + } + + for _, connection := range connections { + if !r.shouldGatherConnection(connection) { + continue + } + tags := map[string]string{ + "url": r.URL, + "connection": connection.Name, + "vhost": connection.Vhost, + "host": connection.Host, + "node": connection.Node, + } + + acc.AddFields( + "rabbitmq_connection", + map[string]interface{}{ + "recv_cnt": connection.ReceiveCount, + "send_cnt": connection.SendCount, + "send_pend": connection.SendPend, + "state": connection.State, + }, + tags, + ) + } + + errChan <- nil +} + func (r *RabbitMQ) shouldGatherNode(node Node) bool { if len(r.Nodes) == 0 { return true @@ -408,6 +457,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { return false } +func (r *RabbitMQ) shouldGatherConnection(connection Connection) bool { + if len(r.Connections) == 0 { + return true + } + + for _, name := range r.Connections { + if name == connection.Name { + return true + } + } + + return false +} + func init() { inputs.Add("rabbitmq", func() telegraf.Input { return &RabbitMQ{ diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 4bdc980dbd53b..bbb3dd450fec6 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -374,6 +374,57 @@ const sampleQueuesResponse = ` ] ` +const sampleConnectionsResponse = ` +[ + { + "recv_oct": 166055, + "recv_oct_details": { + "rate": 0 + }, + "send_oct": 589, + "send_oct_details": { + "rate": 0 + }, + "recv_cnt": 124, + "send_cnt": 7, + "send_pend": 0, + "state": "running", + "channels": 1, + "type": "network", + "node": "rabbit@ip-10-0-12-133", + "name": "10.0.10.8:32774 -> 10.0.12.131:5672", + "port": 5672, + "peer_port": 32774, + "host": "10.0.12.131", + "peer_host": "10.0.10.8", + "ssl": false, + "peer_cert_subject": null, + "peer_cert_issuer": null, + "peer_cert_validity": null, + "auth_mechanism": "AMQPLAIN", + "ssl_protocol": null, + "ssl_key_exchange": null, + "ssl_cipher": null, + "ssl_hash": null, + "protocol": "AMQP 0-9-1", + "user": "workers", + "vhost": "main", + "timeout": 0, + "frame_max": 131072, + "channel_max": 65535, + "client_properties": { + "product": "py-amqp", + "product_version": "1.4.7", + "capabilities": { + "connection.blocked": true, + "consumer_cancel_notify": true + } + }, + "connected_at": 1476647837266 + } +] +` + func TestRabbitMQGeneratesMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var rsp string @@ -385,6 +436,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { rsp = sampleNodesResponse case "/api/queues": rsp = sampleQueuesResponse + case "/api/connections": + rsp = sampleConnectionsResponse default: panic("Cannot handle request") } @@ -441,4 +494,15 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { } assert.True(t, acc.HasMeasurement("rabbitmq_queue")) + + assert.True(t, acc.HasMeasurement("rabbitmq_connection")) + + connection_fields := map[string]interface{}{ + "recv_cnt": int64(124), + "send_cnt": int64(7), + "send_pend": int64(0), + "state": "running", + } + + acc.AssertContainsFields(t, "rabbitmq_connection", connection_fields) }