Skip to content

Commit

Permalink
Merge #102508
Browse files Browse the repository at this point in the history
102508: ccl/sqlproxyccl: export balancer metrics r=jaylim-crl a=jaylim-crl

Previously, the balancer's metrics weren't exported, and this resulted in them
not showing up in the metrics endpoint. This commit addresses that issue.

Release note: None

Epic: none

Release justification: Metrics only change for sqlproxy.

Co-authored-by: Jay <[email protected]>
  • Loading branch information
craig[bot] and jaylim-crl committed Apr 28, 2023
2 parents f8523ec + 78da84d commit a628ff8
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (q *rebalancerQueue) enqueue(req *rebalanceRequest) {

e = q.queue.PushBack(req)
q.elements[req.conn] = e
q.metrics.rebalanceReqQueued.Inc(1)
q.metrics.RebalanceReqQueued.Inc(1)
q.sem.Release(1)
}

Expand Down Expand Up @@ -720,6 +720,6 @@ func (q *rebalancerQueue) dequeue(ctx context.Context) (*rebalanceRequest, error

req := q.queue.Remove(e).(*rebalanceRequest)
delete(q.elements, req.conn)
q.metrics.rebalanceReqQueued.Dec(1)
q.metrics.RebalanceReqQueued.Dec(1)
return req, nil
}
26 changes: 13 additions & 13 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestRebalancer_processQueue(t *testing.T) {
// any request.
assertNoRunningRequests := func(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
runningReq := b.metrics.rebalanceReqRunning.Value()
runningReq := b.metrics.RebalanceReqRunning.Value()
if runningReq != 0 {
return errors.Newf("expected no running requests, but got %d", runningReq)
}
Expand All @@ -115,7 +115,7 @@ func TestRebalancer_processQueue(t *testing.T) {
conn: &testConnHandle{
onTransferConnection: func() error {
count++
require.Equal(t, int64(1), b.metrics.rebalanceReqRunning.Value())
require.Equal(t, int64(1), b.metrics.RebalanceReqRunning.Value())
return errors.New("cannot transfer")
},
},
Expand All @@ -139,7 +139,7 @@ func TestRebalancer_processQueue(t *testing.T) {
conn: &testConnHandle{
onTransferConnection: func() error {
count++
require.Equal(t, int64(1), b.metrics.rebalanceReqRunning.Value())
require.Equal(t, int64(1), b.metrics.RebalanceReqRunning.Value())
return nil
},
},
Expand All @@ -163,7 +163,7 @@ func TestRebalancer_processQueue(t *testing.T) {
conn: &testConnHandle{
onTransferConnection: func() error {
count++
require.Equal(t, int64(1), b.metrics.rebalanceReqRunning.Value())
require.Equal(t, int64(1), b.metrics.RebalanceReqRunning.Value())
return context.Canceled
},
},
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestRebalancer_processQueue(t *testing.T) {
// concurrent rebalances defined.
newCount := atomic.AddInt32(&count, 1)
require.True(t, newCount <= 2)
require.True(t, b.metrics.rebalanceReqRunning.Value() <= 2)
require.True(t, b.metrics.RebalanceReqRunning.Value() <= 2)
return nil
},
},
Expand All @@ -233,7 +233,7 @@ func TestRebalancer_processQueue(t *testing.T) {

// We should only transfer once for every connection.
require.Equal(t, int32(0), count)
require.Equal(t, int64(reqCount), b.metrics.rebalanceReqTotal.Count())
require.Equal(t, int64(reqCount), b.metrics.RebalanceReqTotal.Count())
})
}

Expand Down Expand Up @@ -1257,13 +1257,13 @@ func TestRebalancerQueue(t *testing.T) {

// Enqueue in a specific order. req3 overrides req1; req2 is a no-op.
q.enqueue(req1)
require.Equal(t, int64(1), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(1), q.metrics.RebalanceReqQueued.Value())
q.enqueue(req3)
require.Equal(t, int64(1), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(1), q.metrics.RebalanceReqQueued.Value())
q.enqueue(req2)
require.Len(t, q.elements, 1)
require.Equal(t, 1, q.queue.Len())
require.Equal(t, int64(1), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(1), q.metrics.RebalanceReqQueued.Value())

// Create another request.
conn2 := &testConnHandle{}
Expand All @@ -1272,19 +1272,19 @@ func TestRebalancerQueue(t *testing.T) {
conn: conn2,
}
q.enqueue(req4)
require.Equal(t, int64(2), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(2), q.metrics.RebalanceReqQueued.Value())
require.Len(t, q.elements, 2)
require.Equal(t, 2, q.queue.Len())

// Dequeue the items.
item, err := q.dequeue(ctx)
require.NoError(t, err)
require.Equal(t, req3, item)
require.Equal(t, int64(1), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(1), q.metrics.RebalanceReqQueued.Value())
item, err = q.dequeue(ctx)
require.NoError(t, err)
require.Equal(t, req4, item)
require.Equal(t, int64(0), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(0), q.metrics.RebalanceReqQueued.Value())
require.Empty(t, q.elements)
require.Equal(t, 0, q.queue.Len())

Expand All @@ -1293,7 +1293,7 @@ func TestRebalancerQueue(t *testing.T) {
req4, err = q.dequeue(ctx)
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, req4)
require.Equal(t, int64(0), q.metrics.rebalanceReqQueued.Value())
require.Equal(t, int64(0), q.metrics.RebalanceReqQueued.Value())
}

// TestRebalancerQueueBlocking tests the blocking behavior when invoking
Expand Down
18 changes: 9 additions & 9 deletions pkg/ccl/sqlproxyccl/balancer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import "github.com/cockroachdb/cockroach/pkg/util/metric"
// Metrics contains pointers to the metrics for monitoring balancer-related
// operations.
type Metrics struct {
rebalanceReqRunning *metric.Gauge
rebalanceReqQueued *metric.Gauge
rebalanceReqTotal *metric.Counter
RebalanceReqRunning *metric.Gauge
RebalanceReqQueued *metric.Gauge
RebalanceReqTotal *metric.Counter
}

// MetricStruct implements the metrics.Struct interface.
Expand Down Expand Up @@ -47,19 +47,19 @@ var (
// NewMetrics instantiates the metrics holder for balancer monitoring.
func NewMetrics() *Metrics {
return &Metrics{
rebalanceReqRunning: metric.NewGauge(metaRebalanceReqRunning),
rebalanceReqQueued: metric.NewGauge(metaRebalanceReqQueued),
rebalanceReqTotal: metric.NewCounter(metaRebalanceReqTotal),
RebalanceReqRunning: metric.NewGauge(metaRebalanceReqRunning),
RebalanceReqQueued: metric.NewGauge(metaRebalanceReqQueued),
RebalanceReqTotal: metric.NewCounter(metaRebalanceReqTotal),
}
}

// processRebalanceStart indicates the start of processing a rebalance request.
func (m *Metrics) processRebalanceStart() {
m.rebalanceReqRunning.Inc(1)
m.rebalanceReqTotal.Inc(1)
m.RebalanceReqRunning.Inc(1)
m.RebalanceReqTotal.Inc(1)
}

// processRebalanceFinish indicates the end of processing a rebalance request.
func (m *Metrics) processRebalanceFinish() {
m.rebalanceReqRunning.Dec(1)
m.RebalanceReqRunning.Dec(1)
}
2 changes: 2 additions & 0 deletions pkg/ccl/sqlproxyccl/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func TestHandleVars(t *testing.T) {
require.NoError(t, err)

require.Contains(t, string(out), "# HELP proxy_sql_conns")
require.Contains(t, string(out), "# HELP proxy_balancer_rebalance_total")
require.Contains(t, string(out), "# HELP proxy_conn_migration_attempted")
}

func TestAwaitNoConnections(t *testing.T) {
Expand Down

0 comments on commit a628ff8

Please sign in to comment.