Skip to content

Commit

Permalink
client: support backoff mechanism for memberLoop (tikv#6978)
Browse files Browse the repository at this point in the history
ref tikv#6556

Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp authored Aug 29, 2023
1 parent 75bb796 commit 71e8929
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 22 deletions.
12 changes: 9 additions & 3 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -130,18 +131,23 @@ func (c *baseClient) memberLoop() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
for {
select {
case <-c.checkLeaderCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed update member with retry", errs.ZapError(err))
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -325,12 +326,13 @@ type lastTSO struct {
}

const (
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
dialTimeout = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
updateMemberBackOffBaseTime = 100 * time.Millisecond
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -765,6 +767,7 @@ func (c *client) handleDispatcher(

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
tsoBatchLoop:
for {
select {
Expand Down Expand Up @@ -861,7 +864,7 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckLeader is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.updateMember(); err != nil {
if err := bo.Exec(dispatcherCtx, c.updateMember); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand All @@ -885,7 +888,7 @@ func (c *client) allowTSOFollowerProxy(dc string) bool {
}

// chooseStream uses the reservoir sampling algorithm to randomly choose a connection.
// connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off.
// connectionCtxs will only have one stream to choose when the TSO Follower Proxy is off.
func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *connectionContext) {
idx := 0
connectionCtxs.Range(func(addr, cc interface{}) bool {
Expand Down
2 changes: 1 addition & 1 deletion client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package pd

import (
"context"
"go.uber.org/zap"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down
86 changes: 86 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
base time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
}
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
bo.next *= 2
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
bo.next = bo.base
}

// Only used for test.
var testBackOffExecuteFlag = false

// TestBackOffExecute Only used for test.
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}
47 changes: 47 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
re := require.New(t)

baseBackoff := 100 * time.Millisecond
maxBackoff := 1 * time.Second

backoff := InitialBackOffer(baseBackoff, maxBackoff)
re.Equal(backoff.nextInterval(), baseBackoff)
re.Equal(backoff.nextInterval(), 2*baseBackoff)

for i := 0; i < 10; i++ {
re.LessOrEqual(backoff.nextInterval(), maxBackoff)
}
re.Equal(backoff.nextInterval(), maxBackoff)

// Reset backoff
backoff.resetBackoff()
err := backoff.Exec(context.Background(), func() error {
return errors.New("test")
})
re.Error(err)
}
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,7 +1444,7 @@ func (s *Server) leaderLoop() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
Expand Down
47 changes: 47 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/assertutil"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/testutil"
Expand Down Expand Up @@ -1415,3 +1416,49 @@ func (suite *clientTestSuite) TestScatterRegion() {
resp.GetStatus() == pdpb.OperatorStatus_RUNNING
}, testutil.WithTickInterval(time.Second))
}

func (suite *clientTestSuite) TestMemberUpdateBackOff() {
re := suite.Require()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()

leader := cluster.GetLeader()
waitLeader(re, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)
memberID := cluster.GetServer(leader).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
// make sure back off executed.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/retry/backOffExecute", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader, cli.(client))
re.True(retry.TestBackOffExecute())

re.NotEqual(leader, leader2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/retry/backOffExecute"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli client) string {
var leader string
testutil.Eventually(re, func() bool {
cli.ScheduleCheckLeader()
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
}
return true
})
return leader
}
16 changes: 7 additions & 9 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,22 +397,20 @@ func TestScheduler(t *testing.T) {
pdctl.MustPutStore(re, leaderServer.GetServer(), store)
}
re.Equal("5.2.0", leaderServer.GetClusterVersion().String())
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
// After upgrading, we should not use query.
expected1["read-priorities"] = []interface{}{"query", "byte"}
re.NotEqual(expected1, conf1)
expected1["read-priorities"] = []interface{}{"key", "byte"}
re.Equal(expected1, conf1)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
re.Equal(conf1["read-priorities"], []interface{}{"key", "byte"})
// cannot set qps as write-peer-priorities
echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil)
re.Contains(echo, "query is not allowed to be set in priorities for write-peer-priorities")
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
re.Equal(expected1, conf1)
re.Equal(conf1["write-peer-priorities"], []interface{}{"byte", "key"})

// test remove and add
mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil)
re.Equal(expected1, conf1)
echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-hot-region-scheduler"}, nil)
re.Contains(echo, "Success")
echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-hot-region-scheduler"}, nil)
re.Contains(echo, "Success")

// test balance leader config
conf = make(map[string]interface{})
Expand Down

0 comments on commit 71e8929

Please sign in to comment.