Skip to content

Commit

Permalink
ddl: add require owner interface (pingcap#34407)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei authored May 9, 2022
1 parent 71bfde9 commit 20ecaef
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 123 deletions.
5 changes: 2 additions & 3 deletions ddl/util/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/terror"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -172,7 +171,7 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error {
return errors.Trace(err)
}
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL)
session, err := tidbutil.NewSession(ctx, logPrefix, s.etcdCli, tidbutil.NewSessionDefaultRetryCnt, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -217,7 +216,7 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error {

logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
// NewSession's context will affect the exit of the session.
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionRetryUnlimited, SyncerSessionTTL)
session, err := tidbutil.NewSession(ctx, logPrefix, s.etcdCli, tidbutil.NewSessionRetryUnlimited, SyncerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 1 addition & 2 deletions ddl/util/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
. "github.com/pingcap/tidb/ddl"
. "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -162,7 +161,7 @@ func TestSyncerSimple(t *testing.T) {
NeededCleanTTL = int64(11)
ttlKey := "session_ttl_key"
ttlVal := "session_ttl_val"
session, err := owner.NewSession(ctx, "", cli, owner.NewSessionDefaultRetryCnt, ttl)
session, err := util.NewSession(ctx, "", cli, util.NewSessionDefaultRetryCnt, ttl)
require.NoError(t, err)
require.NoError(t, PutKVToEtcd(context.Background(), cli, 5, ttlKey, ttlVal, clientv3.WithLease(session.Lease())))

Expand Down
13 changes: 6 additions & 7 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
Expand Down Expand Up @@ -196,14 +195,14 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()

// Init creates a new etcd session and stores server info to etcd.
func (is *InfoSyncer) init(ctx context.Context, skipRegisterToDashboard bool) error {
err := is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
err := is.newSessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
if err != nil {
return err
}
if skipRegisterToDashboard {
return nil
}
return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
return is.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
}

// SetSessionManager set the session manager for InfoSyncer.
Expand Down Expand Up @@ -683,12 +682,12 @@ func (is *InfoSyncer) TopologyDone() <-chan struct{} {

// Restart restart the info syncer with new session leaseID and store server info to etcd again.
func (is *InfoSyncer) Restart(ctx context.Context) error {
return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
return is.newSessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
}

// RestartTopology restart the topology syncer with new session leaseID and store server info to etcd again.
func (is *InfoSyncer) RestartTopology(ctx context.Context) error {
return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
return is.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
}

// GetAllTiDBTopology gets all tidb topology
Expand Down Expand Up @@ -718,7 +717,7 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt
return nil
}
logPrefix := fmt.Sprintf("[Info-syncer] %s", is.serverInfoPath)
session, err := owner.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, InfoSessionTTL)
session, err := util2.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, InfoSessionTTL)
if err != nil {
return err
}
Expand All @@ -737,7 +736,7 @@ func (is *InfoSyncer) newTopologySessionAndStoreServerInfo(ctx context.Context,
return nil
}
logPrefix := fmt.Sprintf("[topology-syncer] %s/%s:%d", TopologyInformationPath, is.info.IP, is.info.Port)
session, err := owner.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, TopologySessionTTL)
session, err := util2.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, TopologySessionTTL)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testbridge"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestTopology(t *testing.T) {
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
require.NoError(t, err)

topology, err := info.getTopologyFromEtcd(ctx)
Expand All @@ -84,7 +84,7 @@ func TestTopology(t *testing.T) {
nonTTLKey := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, info.info.IP, info.info.Port)
ttlKey := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, info.info.IP, info.info.Port)

err = util.DeleteKeyFromEtcd(nonTTLKey, client, owner.NewSessionDefaultRetryCnt, time.Second)
err = util.DeleteKeyFromEtcd(nonTTLKey, client, util2.NewSessionDefaultRetryCnt, time.Second)
require.NoError(t, err)

// Refresh and re-test if the key exists
Expand All @@ -107,7 +107,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
require.True(t, ttlExists)

err = util.DeleteKeyFromEtcd(ttlKey, client, owner.NewSessionDefaultRetryCnt, time.Second)
err = util.DeleteKeyFromEtcd(ttlKey, client, util2.NewSessionDefaultRetryCnt, time.Second)
require.NoError(t, err)

err = info.updateTopologyAliveness(ctx)
Expand Down
3 changes: 1 addition & 2 deletions expression/builtin_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ func (b *builtinTiDBIsDDLOwnerSig) Clone() builtinFunc {

// evalInt evals a builtinTiDBIsDDLOwnerSig.
func (b *builtinTiDBIsDDLOwnerSig) evalInt(_ chunk.Row) (res int64, isNull bool, err error) {
ddlOwnerChecker := b.ctx.DDLOwnerChecker()
if ddlOwnerChecker.IsOwner() {
if b.ctx.IsDDLOwner() {
res = 1
}

Expand Down
3 changes: 1 addition & 2 deletions expression/builtin_info_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ func (b *builtinTiDBIsDDLOwnerSig) vectorized() bool {

func (b *builtinTiDBIsDDLOwnerSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error {
n := input.NumRows()
ddlOwnerChecker := b.ctx.DDLOwnerChecker()
var res int64
if ddlOwnerChecker.IsOwner() {
if b.ctx.IsDDLOwner() {
res = 1
}
result.ResizeInt64(n, false)
Expand Down
4 changes: 1 addition & 3 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2870,10 +2870,8 @@ func TestTiDBIsOwnerFunc(t *testing.T) {

tk := testkit.NewTestKit(t, store)
result := tk.MustQuery("select tidb_is_ddl_owner()")
ddlOwnerChecker := tk.Session().DDLOwnerChecker()
require.NotNil(t, ddlOwnerChecker)
var ret int64
if ddlOwnerChecker.IsOwner() {
if tk.Session().IsDDLOwner() {
ret = 1
}
result.Check(testkit.Rows(fmt.Sprintf("%v", ret)))
Expand Down
12 changes: 6 additions & 6 deletions owner/fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,17 @@ func TestFailNewSession(t *testing.T) {
if cli != nil {
_ = cli.Close()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/closeClient"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/closeClient"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/closeClient", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/closeClient", `return(true)`))

// TODO: It takes more than 2s here in etcd client, the CI takes 5s to run this test.
// The config is hard coded, not way to control it outside.
// Call stack:
// https://github.com/etcd-io/etcd/blob/ae9734e/clientv3/concurrency/session.go#L38
// https://github.com/etcd-io/etcd/blob/ae9734ed278b7a1a7dfc82e800471ebbf9fce56f/clientv3/client.go#L253
// https://github.com/etcd-io/etcd/blob/ae9734ed278b7a1a7dfc82e800471ebbf9fce56f/clientv3/retry_interceptor.go#L63
_, err = NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL)
_, err = util.NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL)
isContextDone := terror.ErrorEqual(grpc.ErrClientConnClosing, err) || terror.ErrorEqual(context.Canceled, err)
require.Truef(t, isContextDone, "err %v", err)
}()
Expand All @@ -99,13 +99,13 @@ func TestFailNewSession(t *testing.T) {
if cli != nil {
_ = cli.Close()
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/closeGrpc"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/closeGrpc"))
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/closeGrpc", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/closeGrpc", `return(true)`))

// TODO: It takes more than 2s here in etcd client, the CI takes 5s to run this test.
// The config is hard coded, not way to control it outside.
_, err = NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL)
_, err = util.NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL)
isContextDone := terror.ErrorEqual(grpc.ErrClientConnClosing, err) || terror.ErrorEqual(context.Canceled, err)
require.Truef(t, isContextDone, "err %v", err)
}()
Expand Down
88 changes: 11 additions & 77 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package owner
import (
"context"
"fmt"
"math"
"os"
"strconv"
"sync"
Expand All @@ -26,21 +25,15 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/terror"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
newSessionRetryInterval = 200 * time.Millisecond
logIntervalCnt = int(3 * time.Second / newSessionRetryInterval)
)

// Manager is used to campaign the owner and manage the owner information.
Expand All @@ -59,14 +52,12 @@ type Manager interface {
ResignOwner(ctx context.Context) error
// Cancel cancels this etcd ownerManager campaign.
Cancel()
// RequireOwner requires the ownerManager is owner.
RequireOwner(ctx context.Context) error
}

const (
// NewSessionDefaultRetryCnt is the default retry times when create new session.
NewSessionDefaultRetryCnt = 3
// NewSessionRetryUnlimited is the unlimited retry times when create new session.
NewSessionRetryUnlimited = math.MaxInt64
keyOpDefaultTimeout = 5 * time.Second
keyOpDefaultTimeout = 5 * time.Second
)

// DDLOwnerChecker is used to check whether tidb is owner.
Expand Down Expand Up @@ -121,6 +112,11 @@ func (m *ownerManager) Cancel() {
m.wg.Wait()
}

// RequireOwner implements Manager.RequireOwner interface.
func (m *ownerManager) RequireOwner(ctx context.Context) error {
return nil
}

// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60

Expand All @@ -138,55 +134,11 @@ func setManagerSessionTTL() error {
return nil
}

// NewSession creates a new etcd session.
func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
var err error

var etcdSession *concurrency.Session
failedCnt := 0
for i := 0; i < retryCnt; i++ {
if err = contextDone(ctx, err); err != nil {
return etcdSession, errors.Trace(err)
}

failpoint.Inject("closeClient", func(val failpoint.Value) {
if val.(bool) {
if err := etcdCli.Close(); err != nil {
failpoint.Return(etcdSession, errors.Trace(err))
}
}
})

failpoint.Inject("closeGrpc", func(val failpoint.Value) {
if val.(bool) {
if err := etcdCli.ActiveConnection().Close(); err != nil {
failpoint.Return(etcdSession, errors.Trace(err))
}
}
})

startTime := time.Now()
etcdSession, err = concurrency.NewSession(etcdCli,
concurrency.WithTTL(ttl), concurrency.WithContext(ctx))
metrics.NewSessionHistogram.WithLabelValues(logPrefix, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err == nil {
break
}
if failedCnt%logIntervalCnt == 0 {
logutil.BgLogger().Warn("failed to new session to etcd", zap.String("ownerInfo", logPrefix), zap.Error(err))
}

time.Sleep(newSessionRetryInterval)
failedCnt++
}
return etcdSession, errors.Trace(err)
}

// CampaignOwner implements Manager.CampaignOwner interface.
func (m *ownerManager) CampaignOwner() error {
logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key)
logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix))
session, err := NewSession(m.ctx, logPrefix, m.etcdCli, NewSessionDefaultRetryCnt, ManagerSessionTTL)
session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -246,7 +198,7 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
case <-etcdSession.Done():
logutil.Logger(logCtx).Info("etcd session is done, creates a new one")
leaseID := etcdSession.Lease()
etcdSession, err = NewSession(ctx, logPrefix, m.etcdCli, NewSessionRetryUnlimited, ManagerSessionTTL)
etcdSession, err = util2.NewSession(ctx, logPrefix, m.etcdCli, util2.NewSessionRetryUnlimited, ManagerSessionTTL)
if err != nil {
logutil.Logger(logCtx).Info("break campaign loop, NewSession failed", zap.Error(err))
m.revokeSession(logPrefix, leaseID)
Expand Down Expand Up @@ -372,21 +324,3 @@ func init() {
logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err))
}
}

func contextDone(ctx context.Context, err error) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}
// Sometime the ctx isn't closed, but the etcd client is closed,
// we need to treat it as if context is done.
// TODO: Make sure ctx is closed with etcd client.
if terror.ErrorEqual(err, context.Canceled) ||
terror.ErrorEqual(err, context.DeadlineExceeded) ||
terror.ErrorEqual(err, grpc.ErrClientConnClosing) {
return errors.Trace(err)
}

return nil
}
5 changes: 5 additions & 0 deletions owner/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ func (m *mockManager) ResignOwner(ctx context.Context) error {
}
return nil
}

// RequireOwner implements Manager.RequireOwner interface.
func (m *mockManager) RequireOwner(context.Context) error {
return nil
}
12 changes: 6 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ type session struct {
sessionManager util.SessionManager

statsCollector *handle.SessionStatsCollector
// ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerChecker owner.DDLOwnerChecker
// ddlOwnerManager is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerManager owner.Manager
// lockedTables use to record the table locks hold by the session.
lockedTables map[int64]model.TableLockTpInfo

Expand Down Expand Up @@ -306,9 +306,9 @@ func (s *session) ReleaseAllTableLocks() {
s.lockedTables = make(map[int64]model.TableLockTpInfo)
}

// DDLOwnerChecker returns s.ddlOwnerChecker.
func (s *session) DDLOwnerChecker() owner.DDLOwnerChecker {
return s.ddlOwnerChecker
// IsDDLOwner checks whether this session is DDL owner.
func (s *session) IsDDLOwner() bool {
return s.ddlOwnerManager.IsOwner()
}

func (s *session) cleanRetryInfo() {
Expand Down Expand Up @@ -2987,7 +2987,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
s := &session{
store: store,
sessionVars: variable.NewSessionVars(),
ddlOwnerChecker: dom.DDL().OwnerManager(),
ddlOwnerManager: dom.DDL().OwnerManager(),
client: store.GetClient(),
mppClient: store.GetMPPClient(),
stmtStats: stmtstats.CreateStatementStats(),
Expand Down
Loading

0 comments on commit 20ecaef

Please sign in to comment.