Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] Tablet throttler: empty list of probes on non-leader (#13926) #13952

Merged
merged 3 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

const (
Expand All @@ -57,7 +58,7 @@ const (

dormantPeriod = time.Minute
defaultThrottleTTLMinutes = 60
defaultThrottleRatio = 1.0
DefaultThrottleRatio = 1.0

shardStoreName = "shard"
selfStoreName = "self"
Expand Down Expand Up @@ -108,6 +109,14 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// throttlerTopoService represents the functionality we expect from a TopoServer, abstracted so that
// it can be mocked in unit tests
type throttlerTopoService interface {
GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error)
FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error)
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
}

// Throttler is the main entity in the throttling mechanism. This service runs, probes, collects data,
// aggregates, reads inventory, provides information, etc.
type Throttler struct {
Expand All @@ -123,7 +132,7 @@ type Throttler struct {
env tabletenv.Env
pool *connpool.Pool
tabletTypeFunc func() topodatapb.TabletType
ts *topo.Server
ts throttlerTopoService
srvTopoServer srvtopo.Server
heartbeatWriter heartbeat.HeartbeatWriter

Expand Down Expand Up @@ -446,7 +455,7 @@ func (throttler *Throttler) Open() error {
throttler.pool.Open(throttler.env.Config().DB.AppWithDB(), throttler.env.Config().DB.DbaWithDB(), throttler.env.Config().DB.AppDebugWithDB())
atomic.StoreInt64(&throttler.isOpen, 1)

throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), defaultThrottleRatio)
throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio)

if throttlerConfigViaTopo {
log.Infof("Throttler: throttler-config-via-topo detected")
Expand Down Expand Up @@ -606,8 +615,11 @@ func (throttler *Throttler) Operate(ctx context.Context) {
throttledAppsTicker := addTicker(throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

tmClient := tmclient.NewTabletManagerClient()

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
defer tmClient.Close()
for _, t := range tickers {
defer t.Stop()
// since we just started the tickers now, speed up the ticks by forcing an immediate tick
Expand Down Expand Up @@ -765,8 +777,10 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {

var throttleMetricFunc func() *mysql.MySQLThrottleMetric
if clusterName == selfStoreName {
// Throttler is probing its own tablet's metrics:
throttleMetricFunc = throttler.generateSelfMySQLThrottleMetricFunc(ctx, probe)
} else {
// Throttler probing other tablets:
throttleMetricFunc = throttler.generateTabletHTTPProbeFunction(ctx, clusterName, probe)
}
throttleMetrics := mysql.ReadThrottleMetric(probe, clusterName, throttleMetricFunc)
Expand All @@ -780,7 +794,6 @@ func (throttler *Throttler) collectMySQLMetrics(ctx context.Context) error {

// refreshMySQLInventory will re-structure the inventory based on reading config settings
func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {

// distribute the query/threshold from the throttler down to the cluster settings and from there to the probes
metricsQuery := throttler.GetMetricsQuery()
metricsThreshold := throttler.MetricsThreshold.Load()
Expand Down Expand Up @@ -822,13 +835,20 @@ func (throttler *Throttler) refreshMySQLInventory(ctx context.Context) error {
}

if clusterName == selfStoreName {
// special case: just looking at this tablet's MySQL server
// special case: just looking at this tablet's MySQL server.
// We will probe this "cluster" (of one server) is a special way.
addInstanceKey("", 0, mysql.SelfInstanceKey, clusterName, clusterSettings, clusterProbes.InstanceProbes)
throttler.mysqlClusterProbesChan <- clusterProbes
return
}
if atomic.LoadInt64(&throttler.isLeader) == 0 {
// This tablet may have used to be the primary, but it isn't now. It may have a recollection
// of previous clusters it used to probe. It may have recollection of specific probes for such clusters.
// This now ensures any existing cluster probes are overrridden with an empty list of probes.
// `clusterProbes` was created above as empty, and identificable via `clusterName`. This will in turn
// be used to overwrite throttler.mysqlInventory.ClustersProbes[clusterProbes.ClusterName] in
// updateMySQLClusterProbes().
throttler.mysqlClusterProbesChan <- clusterProbes
// not the leader (primary tablet)? Then no more work for us.
return
}
Expand Down Expand Up @@ -934,7 +954,7 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati
expireAt = now.Add(defaultThrottleTTLMinutes * time.Minute)
}
if ratio < 0 {
ratio = defaultThrottleRatio
ratio = DefaultThrottleRatio
}
appThrottle = base.NewAppThrottle(appName, expireAt, ratio)
}
Expand Down
192 changes: 192 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
Copyright 2017 GitHub Inc.

Licensed under MIT License. See https://github.com/github/freno/blob/master/LICENSE
*/

package throttle

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

const (
waitForProbesTimeout = 30 * time.Second
)

type FakeTopoServer struct {
}

func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) {
tablet := &topo.TabletInfo{
Tablet: &topodatapb.Tablet{
Alias: alias,
Hostname: "127.0.0.1",
MysqlHostname: "127.0.0.1",
MysqlPort: 3306,
PortMap: map[string]int32{"vt": 5000},
Type: topodatapb.TabletType_REPLICA,
},
}
return tablet, nil
}

func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) {
aliases := []*topodatapb.TabletAlias{
{Cell: "zone1", Uid: 100},
{Cell: "zone2", Uid: 101},
}
return aliases, nil
}

func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) {
ks := &topodatapb.SrvKeyspace{}
return ks, nil
}

type FakeHeartbeatWriter struct {
}

func (w FakeHeartbeatWriter) RequestHeartbeats() {
}

func TestIsAppThrottled(t *testing.T) {
throttler := Throttler{
throttledApps: cache.New(cache.NoExpiration, 0),
heartbeatWriter: FakeHeartbeatWriter{},
}
assert.False(t, throttler.IsAppThrottled("app1"))
assert.False(t, throttler.IsAppThrottled("app2"))
assert.False(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4"))
//
throttler.ThrottleApp("app1", time.Now().Add(time.Hour), DefaultThrottleRatio)
throttler.ThrottleApp("app2", time.Now(), DefaultThrottleRatio)
throttler.ThrottleApp("app3", time.Now().Add(time.Hour), DefaultThrottleRatio)
throttler.ThrottleApp("app4", time.Now().Add(time.Hour), 0)
assert.False(t, throttler.IsAppThrottled("app2")) // expired
assert.True(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4")) // ratio is zero
//
throttler.UnthrottleApp("app1")
throttler.UnthrottleApp("app2")
throttler.UnthrottleApp("app3")
throttler.UnthrottleApp("app4")
assert.False(t, throttler.IsAppThrottled("app1"))
assert.False(t, throttler.IsAppThrottled("app2"))
assert.False(t, throttler.IsAppThrottled("app3"))
assert.False(t, throttler.IsAppThrottled("app4"))
}

// TestRefreshMySQLInventory tests the behavior of the throttler's RefreshMySQLInventory() function, which
// is called periodically in actual throttler. For a given cluster name, it generates a list of probes
// the throttler will use to check metrics.
// On a "self" cluster, that list is expect to probe the tablet itself.
// On any other cluster, the list is expected to be empty if non-leader (only leader throttler, on a
// `PRIMARY` tablet, probes other tablets). On the leader, the list is expected to be non-empty.
func TestRefreshMySQLInventory(t *testing.T) {
metricsQuery := "select 1"
config.Settings().Stores.MySQL.Clusters = map[string]*config.MySQLClusterConfigurationSettings{
selfStoreName: {},
"ks1": {},
"ks2": {},
}
clusters := config.Settings().Stores.MySQL.Clusters
for _, s := range clusters {
s.MetricQuery = metricsQuery
s.ThrottleThreshold = &atomic.Uint64{}
s.ThrottleThreshold.Store(1)
}

throttler := &Throttler{
mysqlClusterProbesChan: make(chan *mysql.ClusterProbes),
mysqlClusterThresholds: cache.New(cache.NoExpiration, 0),
ts: &FakeTopoServer{},
mysqlInventory: mysql.NewInventory(),
}
throttler.metricsQuery.Store(metricsQuery)
throttler.initThrottleTabletTypes()

validateClusterProbes := func(t *testing.T, ctx context.Context) {
testName := fmt.Sprintf("leader=%v", atomic.LoadInt64(&throttler.isLeader))
t.Run(testName, func(t *testing.T) {
// validateProbesCount expectes number of probes according to cluster name and throttler's leadership status
validateProbesCount := func(t *testing.T, clusterName string, probes *mysql.Probes) {
if clusterName == selfStoreName {
assert.Equal(t, 1, len(*probes))
} else if atomic.LoadInt64(&throttler.isLeader) > 0 {
assert.NotZero(t, len(*probes))
} else {
assert.Empty(t, *probes)
}
}
t.Run("waiting for probes", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, waitForProbesTimeout)
defer cancel()
numClusterProbesResults := 0
for {
select {
case probes := <-throttler.mysqlClusterProbesChan:
// Worth noting that in this unit test, the throttler is _closed_. Its own Operate() function does
// not run, and therefore there is none but us to both populate `mysqlClusterProbesChan` as well as
// read from it. We do not compete here with any other goroutine.
assert.NotNil(t, probes)

throttler.updateMySQLClusterProbes(ctx, probes)

numClusterProbesResults++
validateProbesCount(t, probes.ClusterName, probes.InstanceProbes)

if numClusterProbesResults == len(clusters) {
// Achieved our goal
return
}
case <-ctx.Done():
assert.FailNowf(t, ctx.Err().Error(), "waiting for %d cluster probes", len(clusters))
}
}
})
t.Run("validating probes", func(t *testing.T) {
for clusterName := range clusters {
probes, ok := throttler.mysqlInventory.ClustersProbes[clusterName]
require.True(t, ok)
validateProbesCount(t, clusterName, probes)
}
})
})
}
//
ctx := context.Background()

t.Run("initial, not leader", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 0)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("promote", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 1)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})

t.Run("demote, expect cleanup", func(t *testing.T) {
atomic.StoreInt64(&throttler.isLeader, 0)
throttler.refreshMySQLInventory(ctx)
validateClusterProbes(t, ctx)
})
}
Loading