diff --git a/ddl/util/syncer.go b/ddl/util/syncer.go index aa3cad4976fb4..f403089941141 100644 --- a/ddl/util/syncer.go +++ b/ddl/util/syncer.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/terror" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" @@ -172,7 +171,7 @@ func (s *schemaVersionSyncer) Init(ctx context.Context) error { return errors.Trace(err) } logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath) - session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionDefaultRetryCnt, SyncerSessionTTL) + session, err := tidbutil.NewSession(ctx, logPrefix, s.etcdCli, tidbutil.NewSessionDefaultRetryCnt, SyncerSessionTTL) if err != nil { return errors.Trace(err) } @@ -217,7 +216,7 @@ func (s *schemaVersionSyncer) Restart(ctx context.Context) error { logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath) // NewSession's context will affect the exit of the session. - session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionRetryUnlimited, SyncerSessionTTL) + session, err := tidbutil.NewSession(ctx, logPrefix, s.etcdCli, tidbutil.NewSessionRetryUnlimited, SyncerSessionTTL) if err != nil { return errors.Trace(err) } diff --git a/ddl/util/syncer_test.go b/ddl/util/syncer_test.go index 75ca82c5bd698..6e837d59f559f 100644 --- a/ddl/util/syncer_test.go +++ b/ddl/util/syncer_test.go @@ -25,7 +25,6 @@ import ( . "github.com/pingcap/tidb/ddl" . "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/util" @@ -162,7 +161,7 @@ func TestSyncerSimple(t *testing.T) { NeededCleanTTL = int64(11) ttlKey := "session_ttl_key" ttlVal := "session_ttl_val" - session, err := owner.NewSession(ctx, "", cli, owner.NewSessionDefaultRetryCnt, ttl) + session, err := util.NewSession(ctx, "", cli, util.NewSessionDefaultRetryCnt, ttl) require.NoError(t, err) require.NoError(t, PutKVToEtcd(context.Background(), cli, 5, ttlKey, ttlVal, clientv3.WithLease(session.Lease()))) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 5b8edd2b4a53b..bbe1c38b8bddc 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -196,14 +195,14 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() // Init creates a new etcd session and stores server info to etcd. func (is *InfoSyncer) init(ctx context.Context, skipRegisterToDashboard bool) error { - err := is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) + err := is.newSessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) if err != nil { return err } if skipRegisterToDashboard { return nil } - return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) + return is.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) } // SetSessionManager set the session manager for InfoSyncer. @@ -683,12 +682,12 @@ func (is *InfoSyncer) TopologyDone() <-chan struct{} { // Restart restart the info syncer with new session leaseID and store server info to etcd again. func (is *InfoSyncer) Restart(ctx context.Context) error { - return is.newSessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) + return is.newSessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) } // RestartTopology restart the topology syncer with new session leaseID and store server info to etcd again. func (is *InfoSyncer) RestartTopology(ctx context.Context) error { - return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) + return is.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) } // GetAllTiDBTopology gets all tidb topology @@ -718,7 +717,7 @@ func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt return nil } logPrefix := fmt.Sprintf("[Info-syncer] %s", is.serverInfoPath) - session, err := owner.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, InfoSessionTTL) + session, err := util2.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, InfoSessionTTL) if err != nil { return err } @@ -737,7 +736,7 @@ func (is *InfoSyncer) newTopologySessionAndStoreServerInfo(ctx context.Context, return nil } logPrefix := fmt.Sprintf("[topology-syncer] %s/%s:%d", TopologyInformationPath, is.info.IP, is.info.Port) - session, err := owner.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, TopologySessionTTL) + session, err := util2.NewSession(ctx, logPrefix, is.etcdCli, retryCnt, TopologySessionTTL) if err != nil { return err } diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 5042e897c1fd6..07b01bc03a7f9 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -28,8 +28,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/ddl/util" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/model" + util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/testbridge" "github.com/stretchr/testify/require" "go.etcd.io/etcd/tests/v3/integration" @@ -69,7 +69,7 @@ func TestTopology(t *testing.T) { info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false) require.NoError(t, err) - err = info.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) + err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) require.NoError(t, err) topology, err := info.getTopologyFromEtcd(ctx) @@ -84,7 +84,7 @@ func TestTopology(t *testing.T) { nonTTLKey := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, info.info.IP, info.info.Port) ttlKey := fmt.Sprintf("%s/%s:%v/ttl", TopologyInformationPath, info.info.IP, info.info.Port) - err = util.DeleteKeyFromEtcd(nonTTLKey, client, owner.NewSessionDefaultRetryCnt, time.Second) + err = util.DeleteKeyFromEtcd(nonTTLKey, client, util2.NewSessionDefaultRetryCnt, time.Second) require.NoError(t, err) // Refresh and re-test if the key exists @@ -107,7 +107,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) require.True(t, ttlExists) - err = util.DeleteKeyFromEtcd(ttlKey, client, owner.NewSessionDefaultRetryCnt, time.Second) + err = util.DeleteKeyFromEtcd(ttlKey, client, util2.NewSessionDefaultRetryCnt, time.Second) require.NoError(t, err) err = info.updateTopologyAliveness(ctx) diff --git a/expression/builtin_info.go b/expression/builtin_info.go index f450bb46ecb65..1513b6bee5386 100644 --- a/expression/builtin_info.go +++ b/expression/builtin_info.go @@ -478,8 +478,7 @@ func (b *builtinTiDBIsDDLOwnerSig) Clone() builtinFunc { // evalInt evals a builtinTiDBIsDDLOwnerSig. func (b *builtinTiDBIsDDLOwnerSig) evalInt(_ chunk.Row) (res int64, isNull bool, err error) { - ddlOwnerChecker := b.ctx.DDLOwnerChecker() - if ddlOwnerChecker.IsOwner() { + if b.ctx.IsDDLOwner() { res = 1 } diff --git a/expression/builtin_info_vec.go b/expression/builtin_info_vec.go index c4343f669e60b..e9384eaafc8c6 100644 --- a/expression/builtin_info_vec.go +++ b/expression/builtin_info_vec.go @@ -179,9 +179,8 @@ func (b *builtinTiDBIsDDLOwnerSig) vectorized() bool { func (b *builtinTiDBIsDDLOwnerSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error { n := input.NumRows() - ddlOwnerChecker := b.ctx.DDLOwnerChecker() var res int64 - if ddlOwnerChecker.IsOwner() { + if b.ctx.IsDDLOwner() { res = 1 } result.ResizeInt64(n, false) diff --git a/expression/integration_test.go b/expression/integration_test.go index e3f2f0c628274..8d34c90ab2b06 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -2870,10 +2870,8 @@ func TestTiDBIsOwnerFunc(t *testing.T) { tk := testkit.NewTestKit(t, store) result := tk.MustQuery("select tidb_is_ddl_owner()") - ddlOwnerChecker := tk.Session().DDLOwnerChecker() - require.NotNil(t, ddlOwnerChecker) var ret int64 - if ddlOwnerChecker.IsOwner() { + if tk.Session().IsDDLOwner() { ret = 1 } result.Check(testkit.Rows(fmt.Sprintf("%v", ret))) diff --git a/owner/fail_test.go b/owner/fail_test.go index 4f93fe278c002..5a6badfbbcb7b 100644 --- a/owner/fail_test.go +++ b/owner/fail_test.go @@ -74,9 +74,9 @@ func TestFailNewSession(t *testing.T) { if cli != nil { _ = cli.Close() } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/closeClient")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/closeClient")) }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/closeClient", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/closeClient", `return(true)`)) // TODO: It takes more than 2s here in etcd client, the CI takes 5s to run this test. // The config is hard coded, not way to control it outside. @@ -84,7 +84,7 @@ func TestFailNewSession(t *testing.T) { // https://github.com/etcd-io/etcd/blob/ae9734e/clientv3/concurrency/session.go#L38 // https://github.com/etcd-io/etcd/blob/ae9734ed278b7a1a7dfc82e800471ebbf9fce56f/clientv3/client.go#L253 // https://github.com/etcd-io/etcd/blob/ae9734ed278b7a1a7dfc82e800471ebbf9fce56f/clientv3/retry_interceptor.go#L63 - _, err = NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL) + _, err = util.NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL) isContextDone := terror.ErrorEqual(grpc.ErrClientConnClosing, err) || terror.ErrorEqual(context.Canceled, err) require.Truef(t, isContextDone, "err %v", err) }() @@ -99,13 +99,13 @@ func TestFailNewSession(t *testing.T) { if cli != nil { _ = cli.Close() } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/closeGrpc")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/closeGrpc")) }() - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/closeGrpc", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/closeGrpc", `return(true)`)) // TODO: It takes more than 2s here in etcd client, the CI takes 5s to run this test. // The config is hard coded, not way to control it outside. - _, err = NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL) + _, err = util.NewSession(context.Background(), "fail_new_session", cli, retryCnt, ManagerSessionTTL) isContextDone := terror.ErrorEqual(grpc.ErrClientConnClosing, err) || terror.ErrorEqual(context.Canceled, err) require.Truef(t, isContextDone, "err %v", err) }() diff --git a/owner/manager.go b/owner/manager.go index 0459f81686d6e..f90dd4cebdd2d 100644 --- a/owner/manager.go +++ b/owner/manager.go @@ -17,7 +17,6 @@ package owner import ( "context" "fmt" - "math" "os" "strconv" "sync" @@ -26,21 +25,15 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/terror" + util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "go.uber.org/zap" - "google.golang.org/grpc" -) - -const ( - newSessionRetryInterval = 200 * time.Millisecond - logIntervalCnt = int(3 * time.Second / newSessionRetryInterval) ) // Manager is used to campaign the owner and manage the owner information. @@ -59,14 +52,12 @@ type Manager interface { ResignOwner(ctx context.Context) error // Cancel cancels this etcd ownerManager campaign. Cancel() + // RequireOwner requires the ownerManager is owner. + RequireOwner(ctx context.Context) error } const ( - // NewSessionDefaultRetryCnt is the default retry times when create new session. - NewSessionDefaultRetryCnt = 3 - // NewSessionRetryUnlimited is the unlimited retry times when create new session. - NewSessionRetryUnlimited = math.MaxInt64 - keyOpDefaultTimeout = 5 * time.Second + keyOpDefaultTimeout = 5 * time.Second ) // DDLOwnerChecker is used to check whether tidb is owner. @@ -121,6 +112,11 @@ func (m *ownerManager) Cancel() { m.wg.Wait() } +// RequireOwner implements Manager.RequireOwner interface. +func (m *ownerManager) RequireOwner(ctx context.Context) error { + return nil +} + // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -138,55 +134,11 @@ func setManagerSessionTTL() error { return nil } -// NewSession creates a new etcd session. -func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) { - var err error - - var etcdSession *concurrency.Session - failedCnt := 0 - for i := 0; i < retryCnt; i++ { - if err = contextDone(ctx, err); err != nil { - return etcdSession, errors.Trace(err) - } - - failpoint.Inject("closeClient", func(val failpoint.Value) { - if val.(bool) { - if err := etcdCli.Close(); err != nil { - failpoint.Return(etcdSession, errors.Trace(err)) - } - } - }) - - failpoint.Inject("closeGrpc", func(val failpoint.Value) { - if val.(bool) { - if err := etcdCli.ActiveConnection().Close(); err != nil { - failpoint.Return(etcdSession, errors.Trace(err)) - } - } - }) - - startTime := time.Now() - etcdSession, err = concurrency.NewSession(etcdCli, - concurrency.WithTTL(ttl), concurrency.WithContext(ctx)) - metrics.NewSessionHistogram.WithLabelValues(logPrefix, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) - if err == nil { - break - } - if failedCnt%logIntervalCnt == 0 { - logutil.BgLogger().Warn("failed to new session to etcd", zap.String("ownerInfo", logPrefix), zap.Error(err)) - } - - time.Sleep(newSessionRetryInterval) - failedCnt++ - } - return etcdSession, errors.Trace(err) -} - // CampaignOwner implements Manager.CampaignOwner interface. func (m *ownerManager) CampaignOwner() error { logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) - session, err := NewSession(m.ctx, logPrefix, m.etcdCli, NewSessionDefaultRetryCnt, ManagerSessionTTL) + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL) if err != nil { return errors.Trace(err) } @@ -246,7 +198,7 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) { case <-etcdSession.Done(): logutil.Logger(logCtx).Info("etcd session is done, creates a new one") leaseID := etcdSession.Lease() - etcdSession, err = NewSession(ctx, logPrefix, m.etcdCli, NewSessionRetryUnlimited, ManagerSessionTTL) + etcdSession, err = util2.NewSession(ctx, logPrefix, m.etcdCli, util2.NewSessionRetryUnlimited, ManagerSessionTTL) if err != nil { logutil.Logger(logCtx).Info("break campaign loop, NewSession failed", zap.Error(err)) m.revokeSession(logPrefix, leaseID) @@ -372,21 +324,3 @@ func init() { logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err)) } } - -func contextDone(ctx context.Context, err error) error { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - default: - } - // Sometime the ctx isn't closed, but the etcd client is closed, - // we need to treat it as if context is done. - // TODO: Make sure ctx is closed with etcd client. - if terror.ErrorEqual(err, context.Canceled) || - terror.ErrorEqual(err, context.DeadlineExceeded) || - terror.ErrorEqual(err, grpc.ErrClientConnClosing) { - return errors.Trace(err) - } - - return nil -} diff --git a/owner/mock.go b/owner/mock.go index 622ca2b632d6c..c13ff88f3fdf6 100644 --- a/owner/mock.go +++ b/owner/mock.go @@ -86,3 +86,8 @@ func (m *mockManager) ResignOwner(ctx context.Context) error { } return nil } + +// RequireOwner implements Manager.RequireOwner interface. +func (m *mockManager) RequireOwner(context.Context) error { + return nil +} diff --git a/session/session.go b/session/session.go index f042324b73498..ccd35ecce3db2 100644 --- a/session/session.go +++ b/session/session.go @@ -219,8 +219,8 @@ type session struct { sessionManager util.SessionManager statsCollector *handle.SessionStatsCollector - // ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement; - ddlOwnerChecker owner.DDLOwnerChecker + // ddlOwnerManager is used in `select tidb_is_ddl_owner()` statement; + ddlOwnerManager owner.Manager // lockedTables use to record the table locks hold by the session. lockedTables map[int64]model.TableLockTpInfo @@ -306,9 +306,9 @@ func (s *session) ReleaseAllTableLocks() { s.lockedTables = make(map[int64]model.TableLockTpInfo) } -// DDLOwnerChecker returns s.ddlOwnerChecker. -func (s *session) DDLOwnerChecker() owner.DDLOwnerChecker { - return s.ddlOwnerChecker +// IsDDLOwner checks whether this session is DDL owner. +func (s *session) IsDDLOwner() bool { + return s.ddlOwnerManager.IsOwner() } func (s *session) cleanRetryInfo() { @@ -2987,7 +2987,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { s := &session{ store: store, sessionVars: variable.NewSessionVars(), - ddlOwnerChecker: dom.DDL().OwnerManager(), + ddlOwnerManager: dom.DDL().OwnerManager(), client: store.GetClient(), mppClient: store.GetMPPClient(), stmtStats: stmtstats.CreateStatementStats(), diff --git a/sessionctx/context.go b/sessionctx/context.go index a188eac69081e..0b4948877aeb1 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" @@ -118,8 +117,8 @@ type Context interface { StmtRollback() // StmtGetMutation gets the binlog mutation for current statement. StmtGetMutation(int64) *binlog.TableMutation - // DDLOwnerChecker returns owner.DDLOwnerChecker. - DDLOwnerChecker() owner.DDLOwnerChecker + // IsDDLOwner checks whether this session is DDL owner. + IsDDLOwner() bool // AddTableLock adds table lock to the session lock map. AddTableLock([]model.TableLockTpInfo) // ReleaseTableLocks releases table locks in the session lock map. diff --git a/util/etcd.go b/util/etcd.go new file mode 100644 index 0000000000000..8657241e4c178 --- /dev/null +++ b/util/etcd.go @@ -0,0 +1,103 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "math" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + newSessionRetryInterval = 200 * time.Millisecond + logIntervalCnt = int(3 * time.Second / newSessionRetryInterval) + + // NewSessionDefaultRetryCnt is the default retry times when create new session. + NewSessionDefaultRetryCnt = 3 + // NewSessionRetryUnlimited is the unlimited retry times when create new session. + NewSessionRetryUnlimited = math.MaxInt64 +) + +// NewSession creates a new etcd session. +func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) { + var err error + + var etcdSession *concurrency.Session + failedCnt := 0 + for i := 0; i < retryCnt; i++ { + if err = contextDone(ctx, err); err != nil { + return etcdSession, errors.Trace(err) + } + + failpoint.Inject("closeClient", func(val failpoint.Value) { + if val.(bool) { + if err := etcdCli.Close(); err != nil { + failpoint.Return(etcdSession, errors.Trace(err)) + } + } + }) + + failpoint.Inject("closeGrpc", func(val failpoint.Value) { + if val.(bool) { + if err := etcdCli.ActiveConnection().Close(); err != nil { + failpoint.Return(etcdSession, errors.Trace(err)) + } + } + }) + + startTime := time.Now() + etcdSession, err = concurrency.NewSession(etcdCli, + concurrency.WithTTL(ttl), concurrency.WithContext(ctx)) + metrics.NewSessionHistogram.WithLabelValues(logPrefix, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) + if err == nil { + break + } + if failedCnt%logIntervalCnt == 0 { + logutil.BgLogger().Warn("failed to new session to etcd", zap.String("ownerInfo", logPrefix), zap.Error(err)) + } + + time.Sleep(newSessionRetryInterval) + failedCnt++ + } + return etcdSession, errors.Trace(err) +} + +func contextDone(ctx context.Context, err error) error { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + default: + } + // Sometime the ctx isn't closed, but the etcd client is closed, + // we need to treat it as if context is done. + // TODO: Make sure ctx is closed with etcd client. + if terror.ErrorEqual(err, context.Canceled) || + terror.ErrorEqual(err, context.DeadlineExceeded) || + terror.ErrorEqual(err, grpc.ErrClientConnClosing) { + return errors.Trace(err) + } + + return nil +} diff --git a/util/mock/context.go b/util/mock/context.go index 05fb15b2f933f..54658dc6aa4a8 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" @@ -108,13 +107,9 @@ func (c *Context) ShowProcess() *util.ProcessInfo { return &util.ProcessInfo{} } -type mockDDLOwnerChecker struct{} - -func (c *mockDDLOwnerChecker) IsOwner() bool { return true } - -// DDLOwnerChecker returns owner.DDLOwnerChecker. -func (c *Context) DDLOwnerChecker() owner.DDLOwnerChecker { - return &mockDDLOwnerChecker{} +// IsDDLOwner checks whether this session is DDL owner. +func (c *Context) IsDDLOwner() bool { + return true } // SetValue implements sessionctx.Context SetValue interface.