Skip to content

Commit

Permalink
[release-17.0] Tablet throttler: empty list of probes on non-leader (#…
Browse files Browse the repository at this point in the history
…13926) (#13952)

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
vitess-bot[bot] and shlomi-noach authored Sep 12, 2023
1 parent 0543850 commit 4f01ad8
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 6 deletions.
32 changes: 26 additions & 6 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

const (
Expand All @@ -57,7 +58,7 @@ const (

dormantPeriod = time.Minute
defaultThrottleTTLMinutes = 60
defaultThrottleRatio = 1.0
DefaultThrottleRatio = 1.0

shardStoreName = "shard"
selfStoreName = "self"
Expand Down Expand Up @@ -108,6 +109,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that
// it can be mocked in unit tests
type throttlerTopoService interface {
GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error)
FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error)
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
}

// Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data,
// aggregates, reads inventory, provides information, etc.
type Throttler struct {
Expand All @@ -123,7 +132,7 @@ type Throttler struct {
env tabletenv.Env
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
ts throttlerTopoService
srvTopoServer srvtopo.Server
heartbeatWriter heartbeat.HeartbeatWriter

Expand Down Expand Up @@ -446,7 +455,7 @@ func (throttler *Throttler) Open() error {
throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB())
atomic.StoreInt64(&throttler.isOpen, 1)

throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio)
throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio)

if throttlerConfigViaTopo {
log.Infof("Throttler: throttler-config-via-topo detected")
Expand Down Expand Up @@ -606,8 +615,11 @@ func (throttler *Throttler) Operate(ctx context.Context) {
throttledAppsTicker := addTicker(throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

tmClient := tmclient.NewTabletManagerClient()

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
defer tmClient.Close()
for _, t := range tickers {
defer t.Stop()
// since we just started the tickers now, speed up the ticks by forcing an immediate tick
Expand Down Expand Up @@ -765,8 +777,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {

var throttleMetricFunc func() *mysql.MySQLThrottleMetric
if clusterName == selfStoreName {
// Throttler is probing its own tablet's metrics:
throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe)
} else {
// Throttler probing other tablets:
throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc)
Expand All @@ -780,7 +794,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {

// refreshMySQLInventory will re-structure the inventory based on reading config settings
func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {

// distribute the query/threshold from the throttler down to the cluster settings and from there to the probes
metricsQuery := throttler.GetMetricsQuery()
metricsThreshold := throttler.MetricsThreshold.Load()
Expand Down Expand Up @@ -822,13 +835,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
}

if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server
// special case: just looking at this tablet's MySQL server.
// We will probe this "cluster" (of one server) is a special way.
addInstanceKey("", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
if atomic.LoadInt64(&throttler.isLeader) == 0 {
// This tablet may have used to be the primary, but it isn't now. It may have a recollection
// of previous clusters it used to probe. It may have recollection of specific probes for such clusters.
// This now ensures any existing cluster probes are overrridden with an empty list of probes.
// `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn
// be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in
// updateMySQLClusterProbes().
throttler.mysqlClusterProbesChan <- clusterProbes
// not the leader (primary tablet)? Then no more work for us.
return
}
Expand Down Expand Up @@ -934,7 +954,7 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati
expireAt = now.Add(defaultThrottleTTLMinutes * time.Minute)
}
if ratio < 0 {
ratio = defaultThrottleRatio
ratio = DefaultThrottleRatio
}
appThrottle = base.NewAppThrottle(appName, expireAt, ratio)
}
Expand Down
192 changes: 192 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
Copyright 2017 GitHub Inc.
Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE
*/

package throttle

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
waitForProbesTimeout = 30 * time.Second
)

type FakeTopoServer struct {
}

func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: alias,
Hostname: "127.0.0.1",
MysqlHostname: "127.0.0.1",
MysqlPort: 3306,
PortMap: map[string]int32{"vt": 5000},
Type: topodatapb.TabletType_REPLICA,
},
}
return tablet, nil
}

func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) {
aliases := []*topodatapb.TabletAlias{
{Cell: "zone1", Uid: 100},
{Cell: "zone2", Uid: 101},
}
return aliases, nil
}

func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
ks := &topodatapb.SrvKeyspace{}
return ks, nil
}

type FakeHeartbeatWriter struct {
}

func (w FakeHeartbeatWriter) RequestHeartbeats() {
}

func TestIsAppThrottled(t *testing.T) {
throttler := Throttler{
throttledApps: cache.New(cache.NoExpiration, 0),
heartbeatWriter: FakeHeartbeatWriter{},
}
assert.False(t, throttler.IsAppThrottled("app1"))
assert.False(t, throttler.IsAppThrottled("app2"))
assert.False(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4"))
//
throttler.ThrottleApp("app1", time.Now().Add(time.Hour), DefaultThrottleRatio)
throttler.ThrottleApp("app2", time.Now(), DefaultThrottleRatio)
throttler.ThrottleApp("app3", time.Now().Add(time.Hour), DefaultThrottleRatio)
throttler.ThrottleApp("app4", time.Now().Add(time.Hour), 0)
assert.False(t, throttler.IsAppThrottled("app2")) // expired
assert.True(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4")) // ratio is zero
//
throttler.UnthrottleApp("app1")
throttler.UnthrottleApp("app2")
throttler.UnthrottleApp("app3")
throttler.UnthrottleApp("app4")
assert.False(t, throttler.IsAppThrottled("app1"))
assert.False(t, throttler.IsAppThrottled("app2"))
assert.False(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4"))
}

// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which
// is called periodically in actual throttler. For a given cluster name, it generates a list of probes
// the throttler will use to check metrics.
// On a "self" cluster, that list is expect to probe the tablet itself.
// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a
// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty.
func TestRefreshMySQLInventory(t *testing.T) {
metricsQuery := "select 1"
config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{
selfStoreName: {},
"ks1": {},
"ks2": {},
}
clusters := config.Settings().Stores.MySQL.Clusters
for _, s := range clusters {
s.MetricQuery = metricsQuery
s.ThrottleThreshold = &atomic.Uint64{}
s.ThrottleThreshold.Store(1)
}

throttler := &Throttler{
mysqlClusterProbesChan: make(chan *mysql.ClusterProbes),
mysqlClusterThresholds: cache.New(cache.NoExpiration, 0),
ts: &FakeTopoServer{},
mysqlInventory: mysql.NewInventory(),
}
throttler.metricsQuery.Store(metricsQuery)
throttler.initThrottleTabletTypes()

validateClusterProbes := func(t *testing.T, ctx context.Context) {
testName := fmt.Sprintf("leader=%v", atomic.LoadInt64(&throttler.isLeader))
t.Run(testName, func(t *testing.T) {
// validateProbesCount expectes number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) {
if clusterName == selfStoreName {
assert.Equal(t, 1, len(*probes))
} else if atomic.LoadInt64(&throttler.isLeader) > 0 {
assert.NotZero(t, len(*probes))
} else {
assert.Empty(t, *probes)
}
}
t.Run("waiting for probes", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
numClusterProbesResults := 0
for {
select {
case probes := <-throttler.mysqlClusterProbesChan:
// Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does
// not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)

throttler.updateMySQLClusterProbes(ctx, probes)

numClusterProbesResults++
validateProbesCount(t, probes.ClusterName, probes.InstanceProbes)

if numClusterProbesResults == len(clusters) {
// Achieved our goal
return
}
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters))
}
}
})
t.Run("validating probes", func(t *testing.T) {
for clusterName := range clusters {
probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName]
require.True(t, ok)
validateProbesCount(t, clusterName, probes)
}
})
})
}
//
ctx := context.Background()

t.Run("initial, not leader", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 0)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("promote", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 1)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("demote, expect cleanup", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 0)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})
}

0 comments on commit 4f01ad8

Please sign in to comment.