Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: empty list of probes on non-leader #13926

Merged
merged 7 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ const (
ThrottleCheckSelf
)

// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that
// it can be mocked in unit tests
type throttlerTopoService interface {
GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error)
FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error)
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
}

// Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data,
// aggregates, reads inventory, provides information, etc.
type Throttler struct {
Expand All @@ -125,7 +133,7 @@ type Throttler struct {
env tabletenv.Env
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
ts throttlerTopoService
srvTopoServer srvtopo.Server
heartbeatWriter heartbeat.HeartbeatWriter

Expand Down Expand Up @@ -602,10 +610,10 @@ func (throttler *Throttler) Operate(ctx context.Context) {
recentCheckTicker := addTicker(time.Second)

tmClient := tmclient.NewTabletManagerClient()
defer tmClient.Close()

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
defer tmClient.Close()
for _, t := range tickers {
defer t.Stop()
// since we just started the tickers now, speed up the ticks by forcing an immediate tick
Expand Down Expand Up @@ -786,8 +794,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm

var throttleMetricFunc func() *mysql.MySQLThrottleMetric
if clusterName == selfStoreName {
// Throttler is probing its own tablet's metrics:
throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe)
} else {
// Throttler probing other tablets:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we had moved this to gRPC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backwards compatibility, we need to consider the option of a v18 PRIMARY with v17 replicas: those will not be able to serve gRPC, and therefore the v18 PRIMARY falls back to probing them via HTTP.

throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, tmClient, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc)
Expand All @@ -801,7 +811,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tm

// refreshMySQLInventory will re-structure the inventory based on reading config settings
func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {

// distribute the query/threshold from the throttler down to the cluster settings and from there to the probes
metricsQuery := throttler.GetMetricsQuery()
metricsThreshold := throttler.MetricsThreshold.Load()
Expand Down Expand Up @@ -844,13 +853,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
}

if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server
// special case: just looking at this tablet's MySQL server.
// We will probe this "cluster" (of one server) is a special way.
addInstanceKey(nil, "", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
if !throttler.isLeader.Load() {
// This tablet may have used to be the primary, but it isn't now. It may have a recollection
// of previous clusters it used to probe. It may have recollection of specific probes for such clusters.
// This now ensures any existing cluster probes are overrridden with an empty list of probes.
// `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn
// be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in
// updateMySQLClusterProbes().
throttler.mysqlClusterProbesChan <- clusterProbes
mattlord marked this conversation as resolved.
Show resolved Hide resolved
// not the leader (primary tablet)? Then no more work for us.
return
}
Expand Down
144 changes: 144 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,57 @@
package throttle

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
waitForProbesTimeout = 30 * time.Second
)

type FakeTopoServer struct {
}
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, you could probably use the existing faketopo package if it's easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried but it did not make much sense to me. I need a specific fake behavior for specific functions?


func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: alias,
Hostname: "127.0.0.1",
MysqlHostname: "127.0.0.1",
MysqlPort: 3306,
PortMap: map[string]int32{"vt": 5000},
Type: topodatapb.TabletType_REPLICA,
},
}
return tablet, nil
}

func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) {
aliases := []*topodatapb.TabletAlias{
{Cell: "zone1", Uid: 100},
{Cell: "zone2", Uid: 101},
}
return aliases, nil
}

func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
ks := &topodatapb.SrvKeyspace{}
return ks, nil
}

type FakeHeartbeatWriter struct {
}

Expand Down Expand Up @@ -50,6 +94,7 @@ func TestIsAppThrottled(t *testing.T) {
}

func TestIsAppExempted(t *testing.T) {

throttler := Throttler{
throttledApps: cache.New(cache.NoExpiration, 0),
heartbeatWriter: FakeHeartbeatWriter{},
Expand All @@ -75,3 +120,102 @@ func TestIsAppExempted(t *testing.T) {
throttler.UnthrottleApp("schema-tracker") // meaningless. App is statically exempted
assert.True(t, throttler.IsAppExempted("schema-tracker"))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short comment would still be nice here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which
// is called periodically in actual throttler. For a given cluster name, it generates a list of probes
// the throttler will use to check metrics.
// On a "self" cluster, that list is expect to probe the tablet itself.
// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a
// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty.
func TestRefreshMySQLInventory(t *testing.T) {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
metricsQuery := "select 1"
config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{
selfStoreName: {},
"ks1": {},
"ks2": {},
}
clusters := config.Settings().Stores.MySQL.Clusters
for _, s := range clusters {
s.MetricQuery = metricsQuery
s.ThrottleThreshold = &atomic.Uint64{}
s.ThrottleThreshold.Store(1)
}

throttler := &Throttler{
mysqlClusterProbesChan: make(chan *mysql.ClusterProbes),
mysqlClusterThresholds: cache.New(cache.NoExpiration, 0),
ts: &FakeTopoServer{},
mysqlInventory: mysql.NewInventory(),
}
throttler.metricsQuery.Store(metricsQuery)
throttler.initThrottleTabletTypes()

validateClusterProbes := func(t *testing.T, ctx context.Context) {
testName := fmt.Sprintf("leader=%t", throttler.isLeader.Load())
t.Run(testName, func(t *testing.T) {
// validateProbesCount expectes number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) {
if clusterName == selfStoreName {
assert.Equal(t, 1, len(*probes))
} else if throttler.isLeader.Load() {
assert.NotZero(t, len(*probes))
} else {
assert.Empty(t, *probes)
}
}
t.Run("waiting for probes", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
numClusterProbesResults := 0
for {
select {
case probes := <-throttler.mysqlClusterProbesChan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's worth a comment that we're NOT competing with the main Operate() goroutine on the channel read because that goroutine only reads from the channel if the tablet's throttler IsOpen() (which it's not in the test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

// Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does
// not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)

throttler.updateMySQLClusterProbes(ctx, probes)

numClusterProbesResults++
validateProbesCount(t, probes.ClusterName, probes.InstanceProbes)

if numClusterProbesResults == len(clusters) {
// Achieved our goal
return
}
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters))
}
}
})
t.Run("validating probes", func(t *testing.T) {
for clusterName := range clusters {
probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName]
require.True(t, ok)
validateProbesCount(t, clusterName, probes)
}
})
})
}
//
ctx := context.Background()

t.Run("initial, not leader", func(t *testing.T) {
throttler.isLeader.Store(false)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("promote", func(t *testing.T) {
throttler.isLeader.Store(true)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("demote, expect cleanup", func(t *testing.T) {
throttler.isLeader.Store(false)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})
}
Loading