Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for labels/filters from go-metrics #3369

Merged
merged 6 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/structs"
"github.com/hashicorp/consul/agent/systemd"
Expand Down Expand Up @@ -94,6 +95,9 @@ type Agent struct {
// Used for streaming logs to
LogWriter *logger.LogWriter

// In-memory sink used for collecting metrics
MemSink *metrics.InmemSink

// delegate is either a *consul.Server or *consul.Client
// depending on the configuration
delegate delegate
Expand Down Expand Up @@ -2244,5 +2248,8 @@ func (a *Agent) ReloadConfig(newCfg *Config) error {
return fmt.Errorf("Failed reloading watches: %v", err)
}

// Update filtered metrics
metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes, newCfg.Telemetry.BlockedPrefixes)

return nil
}
15 changes: 15 additions & 0 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
}, nil
}

func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
acl, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if acl != nil && !acl.AgentRead(s.agent.config.NodeName) {
return nil, errPermissionDenied
}

return s.agent.MemSink.DisplayMetrics(resp, req)
}

func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
Expand Down
28 changes: 28 additions & 0 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,34 @@ func TestAgent_Self_ACLDeny(t *testing.T) {
})
}

func TestAgent_Metrics_ACLDeny(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), TestACLConfig())
defer a.Shutdown()

t.Run("no token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/metrics", nil)
if _, err := a.srv.AgentSelf(nil, req); !isPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
})

t.Run("agent master token", func(t *testing.T) {
req, _ := http.NewRequest("GET", "/v1/agent/metrics?token=towel", nil)
if _, err := a.srv.AgentSelf(nil, req); err != nil {
t.Fatalf("err: %v", err)
}
})

t.Run("read-only token", func(t *testing.T) {
ro := makeReadOnlyAgentACL(t, a.srv)
req, _ := http.NewRequest("GET", fmt.Sprintf("/v1/agent/metrics?token=%s", ro), nil)
if _, err := a.srv.AgentSelf(nil, req); err != nil {
t.Fatalf("err: %v", err)
}
})
}

func TestAgent_Reload(t *testing.T) {
t.Parallel()
cfg := TestConfig()
Expand Down
32 changes: 32 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ type Telemetry struct {
// DisableHostname will disable hostname prefixing for all metrics
DisableHostname bool `mapstructure:"disable_hostname"`

// PrefixFilter is a list of filter rules to apply for allowing/blocking metrics
// by prefix.
PrefixFilter []string `mapstructure:"prefix_filter"`
AllowedPrefixes []string `mapstructure:"-" json:"-"`
BlockedPrefixes []string `mapstructure:"-" json:"-"`

// FilterDefault is the default for whether to allow a metric that's not
// covered by the filter.
FilterDefault *bool `mapstructure:"filter_default"`

// DogStatsdAddr is the address of a dogstatsd instance. If provided,
// metrics will be sent to that instance
DogStatsdAddr string `mapstructure:"dogstatsd_addr"`
Expand Down Expand Up @@ -937,6 +947,7 @@ func DefaultConfig() *Config {
},
Telemetry: Telemetry{
StatsitePrefix: "consul",
FilterDefault: Bool(true),
},
Meta: make(map[string]string),
SyslogFacility: "LOCAL0",
Expand Down Expand Up @@ -1461,6 +1472,21 @@ func DecodeConfig(r io.Reader) (*Config, error) {
result.EnableACLReplication = true
}

// Parse the metric filters
for _, rule := range result.Telemetry.PrefixFilter {
if rule == "" {
return nil, fmt.Errorf("Cannot have empty filter rule in prefix_filter")
}
switch rule[0] {
case '+':
result.Telemetry.AllowedPrefixes = append(result.Telemetry.AllowedPrefixes, rule[1:])
case '-':
result.Telemetry.BlockedPrefixes = append(result.Telemetry.BlockedPrefixes, rule[1:])
default:
return nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %q", rule)
}
}

return &result, nil
}

Expand Down Expand Up @@ -1755,6 +1781,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Telemetry.DisableHostname == true {
result.Telemetry.DisableHostname = true
}
if len(b.Telemetry.PrefixFilter) != 0 {
result.Telemetry.PrefixFilter = append(result.Telemetry.PrefixFilter, b.Telemetry.PrefixFilter...)
}
if b.Telemetry.FilterDefault != nil {
result.Telemetry.FilterDefault = b.Telemetry.FilterDefault
}
if b.Telemetry.StatsdAddr != "" {
result.Telemetry.StatsdAddr = b.Telemetry.StatsdAddr
}
Expand Down
12 changes: 12 additions & 0 deletions agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,18 @@ func TestDecodeConfig(t *testing.T) {
in: `{"telemetry":{"dogstatsd_tags":["a","b"]}}`,
c: &Config{Telemetry: Telemetry{DogStatsdTags: []string{"a", "b"}}},
},
{
in: `{"telemetry":{"filter_default":true}}`,
c: &Config{Telemetry: Telemetry{FilterDefault: Bool(true)}},
},
{
in: `{"telemetry":{"prefix_filter":["+consul.metric","-consul.othermetric"]}}`,
c: &Config{Telemetry: Telemetry{
PrefixFilter: []string{"+consul.metric", "-consul.othermetric"},
AllowedPrefixes: []string{"consul.metric"},
BlockedPrefixes: []string{"consul.othermetric"},
}},
},
{
in: `{"telemetry":{"statsd_address":"a"}}`,
c: &Config{Telemetry: Telemetry{StatsdAddr: "a"}},
Expand Down
9 changes: 6 additions & 3 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

// Provide some metrics
if err == nil {
metrics.IncrCounter([]string{"consul", "catalog", "service", "query", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounter([]string{"consul", "catalog", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounter([]string{"consul", "catalog", "service", "not-found", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
15 changes: 10 additions & 5 deletions agent/consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "kvs", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case api.KVSet:
return c.state.KVSSet(index, &req.DirEnt)
Expand Down Expand Up @@ -216,7 +217,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "session", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
Expand All @@ -236,7 +238,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "acl", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.ACLBootstrapInit:
enabled, err := c.state.ACLBootstrapInit(index)
Expand Down Expand Up @@ -267,7 +270,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "tombstone", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
Expand Down Expand Up @@ -301,7 +305,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
panic(fmt.Errorf("failed to decode request: %v", err))
}

defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.PreparedQuerySet(index, req.Query)
Expand Down
9 changes: 6 additions & 3 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc

// Provide some metrics
if err == nil {
metrics.IncrCounter([]string{"consul", "health", "service", "query", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounter([]string{"consul", "health", "service", "query-tag", args.ServiceName, args.ServiceTag}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.Nodes) == 0 {
metrics.IncrCounter([]string{"consul", "health", "service", "not-found", args.ServiceName}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
Expand Down
3 changes: 2 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
return structs.ErrNoDCPath
}

metrics.IncrCounter([]string{"consul", "rpc", "cross-dc", dc}, 1)
metrics.IncrCounterWithLabels([]string{"consul", "rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
Expand Down
6 changes: 4 additions & 2 deletions agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ START:
func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
q := req.Question[0]
defer func(s time.Time) {
metrics.MeasureSince([]string{"consul", "dns", "ptr_query", d.agent.config.NodeName}, s)
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "ptr_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
Expand Down Expand Up @@ -187,7 +188,8 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
func (d *DNSServer) handleQuery(resp dns.ResponseWriter, req *dns.Msg) {
q := req.Question[0]
defer func(s time.Time) {
metrics.MeasureSince([]string{"consul", "dns", "domain_query", d.agent.config.NodeName}, s)
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "domain_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
Expand Down
2 changes: 2 additions & 0 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {

// Register the wrapper, which will close over the expensive-to-compute
// parts from above.
// TODO (kyhavlov): Convert this to utilize metric labels in a major release
wrapper := func(resp http.ResponseWriter, req *http.Request) {
start := time.Now()
handler(resp, req)
Expand Down Expand Up @@ -97,6 +98,7 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {
handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
handleFuncMetrics("/v1/agent/metrics", s.wrap(s.AgentMetrics))
handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
Expand Down
53 changes: 53 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,42 @@ type AgentToken struct {
Token string
}

// Metrics info is used to store different types of metric values from the agent.
type MetricsInfo struct {
Timestamp string
Gauges []GaugeValue
Points []PointValue
Counters []SampledValue
Samples []SampledValue
}

// GaugeValue stores one value that is updated as time goes on, such as
// the amount of memory allocated.
type GaugeValue struct {
Name string
Value float32
Labels map[string]string
}

// PointValue holds a series of points for a metric.
type PointValue struct {
Name string
Points []float32
}

// SampledValue stores info about a metric that is incremented over time,
// such as the number of requests to an HTTP endpoint.
type SampledValue struct {
Name string
Count int
Sum float64
Min float64
Max float64
Mean float64
Stddev float64
Labels map[string]string
}

// Agent can be used to query the Agent endpoints
type Agent struct {
c *Client
Expand Down Expand Up @@ -126,6 +162,23 @@ func (a *Agent) Self() (map[string]map[string]interface{}, error) {
return out, nil
}

// Metrics is used to query the agent we are speaking to for
// its current internal metric data
func (a *Agent) Metrics() (*MetricsInfo, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics")
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

var out *MetricsInfo
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}

// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
Expand Down
21 changes: 21 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ func TestAPI_AgentSelf(t *testing.T) {
}
}

func TestAPI_AgentMetrics(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

agent := c.Agent()

metrics, err := agent.Metrics()
if err != nil {
t.Fatalf("err: %v", err)
}

if len(metrics.Gauges) < 0 {
t.Fatalf("bad: %v", metrics)
}

if metrics.Gauges[0].Name != "consul.runtime.alloc_bytes" {
t.Fatalf("bad: %v", metrics.Gauges[0])
}
}

func TestAPI_AgentReload(t *testing.T) {
t.Parallel()

Expand Down
Loading