Skip to content

Commit

Permalink
executor: disable closest replica read if cluster is not balanced (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Sep 7, 2022
1 parent e0b5fa1 commit 36b6710
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 4 deletions.
98 changes: 98 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,11 +28,13 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/schematracker"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/globalconfigsync"
Expand All @@ -57,6 +60,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -908,6 +912,10 @@ func (do *Domain) Init(
do.wg.Add(1)
go do.topologySyncerKeeper()
}
if pdClient != nil {
do.wg.Add(1)
go do.closestReplicaReadCheckLoop(ctx, pdClient)
}
err = do.initLogBackup(ctx, pdClient)
if err != nil {
return err
Expand Down Expand Up @@ -936,6 +944,96 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
return nil
}

// when tidb_replica_read = 'closest-adaptive', check tidb and tikv's zone label matches.
// if not match, disable replica_read to avoid uneven read traffic distribution.
func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.Client) {
defer util.Recover(metrics.LabelDomain, "closestReplicaReadCheckLoop", nil, false)

// trigger check once instantly.
if err := do.checkReplicaRead(ctx, pdClient); err != nil {
logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err))
}

ticker := time.NewTicker(time.Minute)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("closestReplicaReadCheckLoop exited.")
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := do.checkReplicaRead(ctx, pdClient); err != nil {
logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err))
}
}
}
}

func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) error {
// fast path
do.sysVarCache.RLock()
replicaRead := do.sysVarCache.global[variable.TiDBReplicaRead]
do.sysVarCache.RUnlock()

if !strings.EqualFold(replicaRead, "closest-adaptive") {
logutil.BgLogger().Debug("closest replica read is not enabled, skip check!", zap.String("mode", replicaRead))
return nil
}
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
}

storeZones := make(map[string]int)
for _, s := range stores {
// skip tumbstone stores or tiflash
if s.NodeState == metapb.NodeState_Removing || s.NodeState == metapb.NodeState_Removed || engine.IsTiFlash(s) {
continue
}
for _, label := range s.Labels {
if label.Key == placement.DCLabelKey && label.Value != "" {
storeZones[label.Value] = 0
break
}
}
}

enabled := false
// if stores don't have zone labels or are distribued in 1 zone, just disable cloeset replica read.
if len(storeZones) > 1 {
enabled = true
servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; !ok {
enabled = false
break
}
storeZones[v] += 1
}
}
if enabled {
for _, count := range storeZones {
if count == 0 {
enabled = false
break
}
}
}
}

if variable.SetEnableAdaptiveReplicaRead(enabled) {
logutil.BgLogger().Info("tidb server adaptive closest replica read is changed", zap.Bool("enable", enabled))
}
return nil
}

type sessionPool struct {
resources chan pools.Resource
factory pools.Factory
Expand Down
89 changes: 89 additions & 0 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/util/mock"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/tests/v3/integration"
)

Expand Down Expand Up @@ -228,3 +230,90 @@ func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil }
func (mebd *mockEtcdBackend) StartGCWorker() error {
panic("not implemented")
}

func TestClosestReplicaReadChecker(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)

ddlLease := 80 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil)
defer func() {
dom.Close()
require.Nil(t, store.Close())
}()
dom.sysVarCache.Lock()
dom.sysVarCache.global = map[string]string{
variable.TiDBReplicaRead: "closest-adaptive",
}
dom.sysVarCache.Unlock()

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", `return("")`))

stores := []*metapb.Store{
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone1",
},
},
},
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone2",
},
},
},
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone3",
},
},
},
}

enabled := variable.IsAdaptiveReplicaReadEnabled()

ctx := context.Background()
pdClient := &mockInfoPdClient{}

// check error
pdClient.err = errors.New("mock error")
err = dom.checkReplicaRead(ctx, pdClient)
require.Error(t, err)
require.Equal(t, enabled, variable.IsAdaptiveReplicaReadEnabled())

// labels matches, should be enabled
pdClient.err = nil
pdClient.stores = stores[:2]
variable.SetEnableAdaptiveReplicaRead(false)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.True(t, variable.IsAdaptiveReplicaReadEnabled())

// labels don't match, should disable the flag
for _, i := range []int{0, 1, 3} {
pdClient.stores = stores[:i]
variable.SetEnableAdaptiveReplicaRead(true)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.False(t, variable.IsAdaptiveReplicaReadEnabled())
}

variable.SetEnableAdaptiveReplicaRead(true)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo"))
}

type mockInfoPdClient struct {
pd.Client
stores []*metapb.Store
err error
}

func (c *mockInfoPdClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return c.stores, c.err
}
15 changes: 15 additions & 0 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,21 @@ func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*Server

// GetAllServerInfo gets all servers static information from etcd.
func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
failpoint.Inject("mockGetAllServerInfo", func() {
res := map[string]*ServerInfo{
"fa598405-a08e-4e74-83ff-75c30b1daedc": {
Labels: map[string]string{
"zone": "zone1",
},
},
"ad84dbbd-5a50-4742-a73c-4f674d41d4bd": {
Labels: map[string]string{
"zone": "zone2",
},
},
}
failpoint.Return(res, nil)
})
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4714,7 +4714,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
if e.ctx.GetSessionVars().GetReplicaRead() == kv.ReplicaReadClosestAdaptive {
if e.ctx.GetSessionVars().IsReplicaReadClosestAdaptive() {
e.snapshot.SetOption(kv.ReplicaReadAdjuster, newReplicaReadAdjuster(e.ctx, plan.GetAvgRowSize()))
}
if e.runtimeStats != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
b.err = err
return nil
}
if b.ctx.GetSessionVars().GetReplicaRead() == kv.ReplicaReadClosestAdaptive {
if b.ctx.GetSessionVars().IsReplicaReadClosestAdaptive() {
e.snapshot.SetOption(kv.ReplicaReadAdjuster, newReplicaReadAdjuster(e.ctx, p.GetAvgRowSize()))
}
if e.runtimeStats != nil {
Expand Down
34 changes: 32 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,29 @@ import (
"golang.org/x/exp/slices"
)

// PreparedStmtCount is exported for test.
var PreparedStmtCount int64
var (
// PreparedStmtCount is exported for test.
PreparedStmtCount int64
// enableAdaptiveReplicaRead indicates whether closest adaptive replica read
// can be enabled. We forces disable replica read when tidb server in missing
// in regions that contains tikv server to avoid read traffic skew.
enableAdaptiveReplicaRead uint32 = 1
)

// SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value.
// return true if the value is changed.
func SetEnableAdaptiveReplicaRead(enabled bool) bool {
value := uint32(0)
if enabled {
value = 1
}
return atomic.SwapUint32(&enableAdaptiveReplicaRead, value) != value
}

// IsAdaptiveReplicaReadEnabled returns whether adaptive closest replica read can be enabled.
func IsAdaptiveReplicaReadEnabled() bool {
return atomic.LoadUint32(&enableAdaptiveReplicaRead) > 0
}

// RetryInfo saves retry information.
type RetryInfo struct {
Expand Down Expand Up @@ -1638,6 +1659,10 @@ func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType {
if s.StmtCtx.HasReplicaReadHint {
return kv.ReplicaReadType(s.StmtCtx.ReplicaRead)
}
// if closest-adaptive is unavailable, fallback to leader read
if s.replicaRead == kv.ReplicaReadClosestAdaptive && !IsAdaptiveReplicaReadEnabled() {
return kv.ReplicaReadLeader
}
return s.replicaRead
}

Expand All @@ -1646,6 +1671,11 @@ func (s *SessionVars) SetReplicaRead(val kv.ReplicaReadType) {
s.replicaRead = val
}

// IsReplicaReadClosestAdaptive returns whether adaptive closest replica can be enabled.
func (s *SessionVars) IsReplicaReadClosestAdaptive() bool {
return s.replicaRead == kv.ReplicaReadClosestAdaptive && IsAdaptiveReplicaReadEnabled()
}

// GetWriteStmtBufs get pointer of SessionVars.writeStmtBufs.
func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs {
return &s.writeStmtBufs
Expand Down

0 comments on commit 36b6710

Please sign in to comment.