Skip to content

Commit

Permalink
Rework Metrics Part 2
Browse files Browse the repository at this point in the history
* add MetricMetadata to all metrics. Contains name/help string.
* don't automatically add metrics to registries
* replace all Registry.X methods to create/add new metrics with NewX
* "pseudo" metrics are a group that expands into individual metrics
* optional label pairs on registries (storeID only for now)
* registries can no longer be added to registries

A few things I would still like to do:
* make registries just hold a slice. sorting them would allow reasonable
  searching (used in tests only) and keep the vars page sorted (easier
  refresh)
* remove registries from big objects, instead just keep the metrics. The
  metric/registry pattern is not the same everywhere.
* fill in help strings and audit names. this is part 3.
  • Loading branch information
marc committed Aug 11, 2016
1 parent d6047c6 commit 3b2c560
Show file tree
Hide file tree
Showing 31 changed files with 889 additions and 664 deletions.
3 changes: 1 addition & 2 deletions gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/timeutil"
)
Expand Down Expand Up @@ -62,7 +61,7 @@ func newClient(addr net.Addr, nodeMetrics metrics) *client {
addr: addr,
remoteHighWaterStamps: map[roachpb.NodeID]int64{},
closer: make(chan struct{}),
clientMetrics: makeMetrics(metric.NewRegistry()),
clientMetrics: makeMetrics(),
nodeMetrics: nodeMetrics,
}
}
Expand Down
52 changes: 22 additions & 30 deletions gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestClientGossip(t *testing.T) {
local := startGossip(1, stopper, t, metric.NewRegistry())
remote := startGossip(2, stopper, t, metric.NewRegistry())
disconnected := make(chan *client, 1)
c := newClient(&remote.is.NodeAddr, makeMetrics(metric.NewRegistry()))
c := newClient(&remote.is.NodeAddr, makeMetrics())

defer func() {
stopper.Stop()
Expand Down Expand Up @@ -184,16 +184,14 @@ func TestClientGossipMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop()
localRegistry := metric.NewRegistry()
local := startGossip(1, stopper, t, localRegistry)
remoteRegistry := metric.NewRegistry()
remote := startGossip(2, stopper, t, remoteRegistry)
local := startGossip(1, stopper, t, metric.NewRegistry())
remote := startGossip(2, stopper, t, metric.NewRegistry())

gossipSucceedsSoon(
t, stopper, make(chan *client, 2),
map[*client]*Gossip{
newClient(&local.is.NodeAddr, makeMetrics(metric.NewRegistry())): remote,
newClient(&remote.is.NodeAddr, makeMetrics(metric.NewRegistry())): local,
newClient(&local.is.NodeAddr, makeMetrics()): remote,
newClient(&remote.is.NodeAddr, makeMetrics()): local,
},
func() error {
if err := local.AddInfo("local-key", nil, time.Hour); err != nil {
Expand All @@ -204,35 +202,29 @@ func TestClientGossipMetrics(t *testing.T) {
}

// Infos/Bytes Sent/Received should not be zero.
for i, reg := range []*metric.Registry{localRegistry, remoteRegistry} {
for _, ratesName := range []string{
InfosSentRatesName,
InfosReceivedRatesName,
BytesSentRatesName,
BytesReceivedRatesName,
for i, s := range []*server{local.server, remote.server} {
for _, rate := range []metric.Rates{
s.nodeMetrics.infosSent,
s.nodeMetrics.infosReceived,
s.nodeMetrics.bytesSent,
s.nodeMetrics.bytesReceived,
} {
counterName := ratesName + "-count"
counter := reg.GetCounter(counterName)
if counter == nil {
return errors.Errorf("%d: missing counter %q", i, counterName)
}
counter := rate.Counter
if count := counter.Count(); count <= 0 {
return errors.Errorf("%d: expected metrics counter %q > 0; = %d", i, counterName, count)
return errors.Errorf("%d: expected metrics counter %q > 0; = %d", i, counter.GetName(), count)
}
}
}

// Since there are two gossip nodes, there should be at least one incoming
// and outgoing connection.
for i, reg := range []*metric.Registry{localRegistry, remoteRegistry} {
for _, name := range []string{} {
gauge := reg.GetGauge(name)
if gauge == nil {
return errors.Errorf("%d: missing gauge %q", i, name)
}
if count := gauge.Value(); count <= 0 {
return errors.Errorf("%d: expected metrics gauge %q > 0; = %d", i, name, count)
}
for i, s := range []*server{local.server, remote.server} {
gauge := s.incoming.gauge
if gauge == nil {
return errors.Errorf("%d: missing gauge \"incoming\"", i)
}
if count := gauge.Value(); count <= 0 {
return errors.Errorf("%d: expected metrics gauge %q > 0; = %d", i, gauge.GetName(), count)
}
}
return nil
Expand All @@ -249,7 +241,7 @@ func TestClientNodeID(t *testing.T) {

// Use an insecure context. We're talking to tcp socket which are not in the certs.
rpcContext := rpc.NewContext(&base.Context{Insecure: true}, nil, stopper)
c := newClient(&remote.nodeAddr, makeMetrics(metric.NewRegistry()))
c := newClient(&remote.nodeAddr, makeMetrics())
disconnected := make(chan *client, 1)
disconnected <- c

Expand Down Expand Up @@ -494,7 +486,7 @@ func TestClientForwardUnresolved(t *testing.T) {
addr := local.is.NodeAddr
local.mu.Unlock()

client := newClient(&addr, makeMetrics(metric.NewRegistry())) // never started
client := newClient(&addr, makeMetrics()) // never started

newAddr := util.UnresolvedAddr{
NetworkField: "tcp",
Expand Down
30 changes: 15 additions & 15 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ const (
)

// Gossip metrics counter names.
const (
ConnectionsIncomingGaugeName = "gossip.connections.incoming"
ConnectionsOutgoingGaugeName = "gossip.connections.outgoing"
InfosSentRatesName = "gossip.infos.sent"
InfosReceivedRatesName = "gossip.infos.received"
BytesSentRatesName = "gossip.bytes.sent"
BytesReceivedRatesName = "gossip.bytes.received"
var (
MetaConnectionsIncomingGauge = metric.MetricMetadata{"gossip.connections.incoming", ""}
MetaConnectionsOutgoingGauge = metric.MetricMetadata{"gossip.connections.outgoing", ""}
MetaInfosSentRates = metric.MetricMetadata{"gossip.infos.sent", ""}
MetaInfosReceivedRates = metric.MetricMetadata{"gossip.infos.received", ""}
MetaBytesSentRates = metric.MetricMetadata{"gossip.bytes.sent", ""}
MetaBytesReceivedRates = metric.MetricMetadata{"gossip.bytes.received", ""}
)

// Storage is an interface which allows the gossip instance
Expand Down Expand Up @@ -196,7 +196,7 @@ func New(rpcContext *rpc.Context, grpcServer *grpc.Server, resolvers []resolver.
Connected: make(chan struct{}),
rpcContext: rpcContext,
server: newServer(stopper, registry),
outgoing: makeNodeSet(minPeers, registry.Gauge(ConnectionsOutgoingGaugeName)),
outgoing: makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsOutgoingGauge)),
bootstrapping: map[string]struct{}{},
disconnected: make(chan *client, 10),
stalledCh: make(chan struct{}, 1),
Expand All @@ -207,6 +207,7 @@ func New(rpcContext *rpc.Context, grpcServer *grpc.Server, resolvers []resolver.
resolverAddrs: map[util.UnresolvedAddr]resolver.Resolver{},
bootstrapAddrs: map[util.UnresolvedAddr]struct{}{},
}
registry.AddMetric(g.outgoing.gauge)
g.SetResolvers(resolvers)

// Add ourselves as a SystemConfig watcher.
Expand Down Expand Up @@ -1092,13 +1093,12 @@ func (m metrics) String() string {
m.infosSent.Count(), m.infosReceived.Count(), m.bytesSent.Count(), m.bytesReceived.Count())
}

// makeMetrics makes a new metrics object with rates set on the provided
// registry.
func makeMetrics(registry *metric.Registry) metrics {
// makeMetrics makes a new metrics object with rates.
func makeMetrics() metrics {
return metrics{
bytesReceived: registry.Rates(BytesReceivedRatesName),
bytesSent: registry.Rates(BytesSentRatesName),
infosReceived: registry.Rates(InfosReceivedRatesName),
infosSent: registry.Rates(InfosSentRatesName),
bytesReceived: metric.NewRates(MetaBytesReceivedRates),
bytesSent: metric.NewRates(MetaBytesSentRates),
infosReceived: metric.NewRates(MetaInfosReceivedRates),
infosSent: metric.NewRates(MetaInfosSentRates),
}
}
4 changes: 2 additions & 2 deletions gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
}

for _, peer := range peers {
c := newClient(&local.is.NodeAddr, makeMetrics(metric.NewRegistry()))
c := newClient(&local.is.NodeAddr, makeMetrics())

util.SucceedsSoon(t, func() error {
conn, err := peer.rpcContext.GRPCDial(c.addr.String(), grpc.WithBlock())
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
peer := startGossip(roachpb.NodeID(i+local.server.incoming.maxSize+2), stopper, t, metric.NewRegistry())

for {
c := newClient(&local.is.NodeAddr, makeMetrics(metric.NewRegistry()))
c := newClient(&local.is.NodeAddr, makeMetrics())
c.start(peer, disconnectedCh, peer.rpcContext, stopper)

disconnectedClient := <-disconnectedCh
Expand Down
2 changes: 1 addition & 1 deletion gossip/infostore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestLeastUseful(t *testing.T) {
defer stopper.Stop()
is := newInfoStore(1, emptyAddr, stopper)

set := makeNodeSet(3, metric.NewGauge())
set := makeNodeSet(3, metric.NewGauge(metric.MetricMetadata{"", ""}))
if is.leastUseful(set) != 0 {
t.Error("not expecting a node from an empty set")
}
Expand Down
3 changes: 2 additions & 1 deletion gossip/node_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (as nodeSet) asSlice() []roachpb.NodeID {
// node and false to remove an node. The new nodeSet has a separate gauge object
// from the parent.
func (as nodeSet) filter(filterFn func(node roachpb.NodeID) bool) nodeSet {
avail := makeNodeSet(as.maxSize, metric.NewGauge())
avail := makeNodeSet(as.maxSize,
metric.NewGauge(metric.MetricMetadata{"TODO(marc)", "TODO(marc)"}))
for node := range as.nodes {
if filterFn(node) {
avail.addNode(node)
Expand Down
12 changes: 6 additions & 6 deletions gossip/node_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestNodeSetMaxSize(t *testing.T) {
defer leaktest.AfterTest(t)()
nodes := makeNodeSet(1, metric.NewGauge())
nodes := makeNodeSet(1, metric.NewGauge(metric.MetricMetadata{"", ""}))
if !nodes.hasSpace() {
t.Error("set should have space")
}
Expand All @@ -38,7 +38,7 @@ func TestNodeSetMaxSize(t *testing.T) {

func TestNodeSetHasNode(t *testing.T) {
defer leaktest.AfterTest(t)()
nodes := makeNodeSet(2, metric.NewGauge())
nodes := makeNodeSet(2, metric.NewGauge(metric.MetricMetadata{"", ""}))
node := roachpb.NodeID(1)
if nodes.hasNode(node) {
t.Error("node wasn't added and should not be valid")
Expand All @@ -52,7 +52,7 @@ func TestNodeSetHasNode(t *testing.T) {

func TestNodeSetAddAndRemoveNode(t *testing.T) {
defer leaktest.AfterTest(t)()
nodes := makeNodeSet(2, metric.NewGauge())
nodes := makeNodeSet(2, metric.NewGauge(metric.MetricMetadata{"", ""}))
node0 := roachpb.NodeID(1)
node1 := roachpb.NodeID(2)
nodes.addNode(node0)
Expand All @@ -72,13 +72,13 @@ func TestNodeSetAddAndRemoveNode(t *testing.T) {

func TestNodeSetFilter(t *testing.T) {
defer leaktest.AfterTest(t)()
nodes1 := makeNodeSet(2, metric.NewGauge())
nodes1 := makeNodeSet(2, metric.NewGauge(metric.MetricMetadata{"", ""}))
node0 := roachpb.NodeID(1)
node1 := roachpb.NodeID(2)
nodes1.addNode(node0)
nodes1.addNode(node1)

nodes2 := makeNodeSet(1, metric.NewGauge())
nodes2 := makeNodeSet(1, metric.NewGauge(metric.MetricMetadata{"", ""}))
nodes2.addNode(node1)

filtered := nodes1.filter(func(a roachpb.NodeID) bool {
Expand All @@ -91,7 +91,7 @@ func TestNodeSetFilter(t *testing.T) {

func TestNodeSetAsSlice(t *testing.T) {
defer leaktest.AfterTest(t)()
nodes := makeNodeSet(2, metric.NewGauge())
nodes := makeNodeSet(2, metric.NewGauge(metric.MetricMetadata{"", ""}))
node0 := roachpb.NodeID(1)
node1 := roachpb.NodeID(2)
nodes.addNode(node0)
Expand Down
16 changes: 12 additions & 4 deletions gossip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,24 @@ type server struct {

// newServer creates and returns a server struct.
func newServer(stopper *stop.Stopper, registry *metric.Registry) *server {
return &server{
s := &server{
stopper: stopper,
is: newInfoStore(0, util.UnresolvedAddr{}, stopper),
incoming: makeNodeSet(minPeers, registry.Gauge(ConnectionsIncomingGaugeName)),
incoming: makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsIncomingGauge)),
nodeMap: make(map[util.UnresolvedAddr]serverInfo),
tighten: make(chan roachpb.NodeID, 1),
ready: make(chan struct{}),
nodeMetrics: makeMetrics(registry),
serverMetrics: makeMetrics(metric.NewRegistry()),
nodeMetrics: makeMetrics(),
serverMetrics: makeMetrics(),
}

registry.AddMetric(s.incoming.gauge)
registry.AddMetricGroup(s.nodeMetrics.bytesReceived)
registry.AddMetricGroup(s.nodeMetrics.bytesSent)
registry.AddMetricGroup(s.nodeMetrics.infosReceived)
registry.AddMetricGroup(s.nodeMetrics.infosSent)

return s
}

// Gossip receives gossiped information from a peer node.
Expand Down
4 changes: 2 additions & 2 deletions gossip/simulation/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (n *Network) isNetworkConnected() bool {
func (n *Network) infosSent() int {
var count int64
for _, node := range n.Nodes {
count += node.Registry.GetCounter(gossip.InfosSentRatesName + "-count").Count()
count += node.Registry.GetCounter(gossip.MetaInfosSentRates.Name + "-count").Count()
}
return int(count)
}
Expand All @@ -260,7 +260,7 @@ func (n *Network) infosSent() int {
func (n *Network) infosReceived() int {
var count int64
for _, node := range n.Nodes {
count += node.Registry.GetCounter(gossip.InfosReceivedRatesName + "-count").Count()
count += node.Registry.GetCounter(gossip.MetaInfosReceivedRates.Name + "-count").Count()
}
return int(count)
}
38 changes: 23 additions & 15 deletions kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,34 @@ type TxnMetrics struct {
Restarts *metric.Histogram
}

const (
abortsPrefix = "txn.aborts"
commitsPrefix = "txn.commits"
commits1PCPrefix = "txn.commits1PC"
abandonsPrefix = "txn.abandons"
durationsPrefix = "txn.durations"
restartsKey = "txn.restarts"
var (
metaAbortsRates = metric.MetricMetadata{"txn.aborts", ""}
metaCommitsRates = metric.MetricMetadata{"txn.commits", ""}
metaCommits1PCRates = metric.MetricMetadata{"txn.commits1PC", ""}
metaAbandonsRates = metric.MetricMetadata{"txn.abandons", ""}
metaDurationsHistograms = metric.MetricMetadata{"txn.durations", ""}
metaRestartsHistogram = metric.MetricMetadata{"txn.restarts", ""}
)

// NewTxnMetrics returns a new instance of txnMetrics that contains metrics which have
// been registered with the provided Registry.
func NewTxnMetrics(registry *metric.Registry) *TxnMetrics {
return &TxnMetrics{
Aborts: registry.Rates(abortsPrefix),
Commits: registry.Rates(commitsPrefix),
Commits1PC: registry.Rates(commits1PCPrefix),
Abandons: registry.Rates(abandonsPrefix),
Durations: registry.Latency(durationsPrefix),
Restarts: registry.Histogram(restartsKey, 60*time.Second, 100, 3),
}
tm := &TxnMetrics{
Aborts: metric.NewRates(metaAbortsRates),
Commits: metric.NewRates(metaCommitsRates),
Commits1PC: metric.NewRates(metaCommits1PCRates),
Abandons: metric.NewRates(metaAbandonsRates),
Durations: metric.NewLatency(metaDurationsHistograms),
Restarts: metric.NewHistogram(metaRestartsHistogram, 60*time.Second, 100, 3),
}
registry.AddMetricGroup(tm.Aborts)
registry.AddMetricGroup(tm.Commits)
registry.AddMetricGroup(tm.Commits1PC)
registry.AddMetricGroup(tm.Abandons)
registry.AddMetricGroup(tm.Durations)
registry.AddMetric(tm.Restarts)

return tm
}

// A TxnCoordSender is an implementation of client.Sender which
Expand Down
Loading

0 comments on commit 3b2c560

Please sign in to comment.