Skip to content

Commit

Permalink
domain: disable closest-adaptive dynamically to make read traffic mor…
Browse files Browse the repository at this point in the history
…e even (#38960)

ref #35926
  • Loading branch information
glorv authored Nov 22, 2022
1 parent 75efe68 commit c9bb2f2
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 37 deletions.
84 changes: 62 additions & 22 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package domain
import (
"context"
"fmt"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1106,8 +1108,12 @@ func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.C
}
}

// Periodically check and update the replica-read status when `tidb_replica_read` is set to "closest-adaptive"
// We disable "closest-adaptive" in following conditions to ensure the read traffic is evenly distributed across
// all AZs:
// - There are no TiKV servers in the AZ of this tidb instance
// - The AZ if this tidb contains more tidb than other AZ and this tidb's id is the bigger one.
func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) error {
// fast path
do.sysVarCache.RLock()
replicaRead := do.sysVarCache.global[variable.TiDBReplicaRead]
do.sysVarCache.RUnlock()
Expand All @@ -1116,6 +1122,24 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
logutil.BgLogger().Debug("closest replica read is not enabled, skip check!", zap.String("mode", replicaRead))
return nil
}

serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
zone := ""
for k, v := range serverInfo.Labels {
if k == placement.DCLabelKey && v != "" {
zone = v
break
}
}
if zone == "" {
logutil.BgLogger().Debug("server contains no 'zone' label, disable closest replica read", zap.Any("labels", serverInfo.Labels))
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
Expand All @@ -1135,32 +1159,48 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
}
}

enabled := false
// if stores don't have zone labels or are distribued in 1 zone, just disable cloeset replica read.
if len(storeZones) > 1 {
enabled = true
servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; !ok {
enabled = false
break
}
// no stores in this AZ
if _, ok := storeZones[zone]; !ok {
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
svrIdsInThisZone := make([]string, 0)
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; ok {
storeZones[v] += 1
}
}
if enabled {
for _, count := range storeZones {
if count == 0 {
enabled = false
break
if v == zone {
svrIdsInThisZone = append(svrIdsInThisZone, s.ID)
}
}
}
}
enabledCount := math.MaxInt
for _, count := range storeZones {
if count < enabledCount {
enabledCount = count
}
}
// sort tidb in the same AZ by ID and disable the tidb with bigger ID
// because ID is unchangeable, so this is a simple and stable algorithm to select
// some instances across all tidb servers.
if enabledCount < len(svrIdsInThisZone) {
sort.Slice(svrIdsInThisZone, func(i, j int) bool {
return strings.Compare(svrIdsInThisZone[i], svrIdsInThisZone[j]) < 0
})
}
enabled := true
for _, s := range svrIdsInThisZone[enabledCount:] {
if s == serverInfo.ID {
enabled = false
break
}
}

if variable.SetEnableAdaptiveReplicaRead(enabled) {
logutil.BgLogger().Info("tidb server adaptive closest replica read is changed", zap.Bool("enable", enabled))
Expand Down
95 changes: 94 additions & 1 deletion domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package domain
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"runtime"
"testing"
Expand Down Expand Up @@ -247,7 +249,29 @@ func TestClosestReplicaReadChecker(t *testing.T) {
}
dom.sysVarCache.Unlock()

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", `return("")`))
makeFailpointRes := func(v interface{}) string {
bytes, err := json.Marshal(v)
require.NoError(t, err)
return fmt.Sprintf("return(`%s`)", string(bytes))
}

mockedAllServerInfos := map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos["s2"])))

stores := []*metapb.Store{
{
Expand Down Expand Up @@ -304,8 +328,77 @@ func TestClosestReplicaReadChecker(t *testing.T) {
require.False(t, variable.IsAdaptiveReplicaReadEnabled())
}

// partial matches
mockedAllServerInfos = map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
"s22": {
ID: "s22",
Labels: map[string]string{
"zone": "zone2",
},
},
"s3": {
ID: "s3",
Labels: map[string]string{
"zone": "zone3",
},
},
"s4": {
ID: "s4",
Labels: map[string]string{
"zone": "zone4",
},
},
}
pdClient.stores = stores
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
cases := []struct {
id string
matches bool
}{
{
id: "s1",
matches: true,
},
{
id: "s2",
matches: true,
},
{
id: "s22",
matches: false,
},
{
id: "s3",
matches: true,
},
{
id: "s4",
matches: false,
},
}
for _, c := range cases {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos[c.id])))
variable.SetEnableAdaptiveReplicaRead(!c.matches)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.Equal(t, c.matches, variable.IsAdaptiveReplicaReadEnabled())
}

variable.SetEnableAdaptiveReplicaRead(true)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo"))
}

type mockInfoPdClient struct {
Expand Down
23 changes: 9 additions & 14 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ func SetMockTiFlash(tiflash *MockTiFlash) {

// GetServerInfo gets self server static information.
func GetServerInfo() (*ServerInfo, error) {
failpoint.Inject("mockGetServerInfo", func(v failpoint.Value) {
var res ServerInfo
err := json.Unmarshal([]byte(v.(string)), &res)
failpoint.Return(&res, err)
})
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
Expand Down Expand Up @@ -316,20 +321,10 @@ func (is *InfoSyncer) getServerInfoByID(ctx context.Context, id string) (*Server

// GetAllServerInfo gets all servers static information from etcd.
func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
failpoint.Inject("mockGetAllServerInfo", func() {
res := map[string]*ServerInfo{
"fa598405-a08e-4e74-83ff-75c30b1daedc": {
Labels: map[string]string{
"zone": "zone1",
},
},
"ad84dbbd-5a50-4742-a73c-4f674d41d4bd": {
Labels: map[string]string{
"zone": "zone2",
},
},
}
failpoint.Return(res, nil)
failpoint.Inject("mockGetAllServerInfo", func(val failpoint.Value) {
res := make(map[string]*ServerInfo)
err := json.Unmarshal([]byte(val.(string)), &res)
failpoint.Return(res, err)
})
is, err := getGlobalInfoSyncer()
if err != nil {
Expand Down

0 comments on commit c9bb2f2

Please sign in to comment.