From b29826a5f8a111c56492a53989d493f546935e22 Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Fri, 10 Jan 2020 15:24:43 +0800 Subject: [PATCH] store: fix potential panic in GC worker (#14403) --- store/tikv/gcworker/gc_worker.go | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 4381445b53dc3..a8a5163e23b12 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -53,8 +53,6 @@ type GCWorker struct { lastFinish time.Time cancel context.CancelFunc done chan error - - session session.Session } // NewGCWorker creates a GCWorker instance. @@ -136,8 +134,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { logutil.Logger(ctx).Info("[gc worker] start", zap.String("uuid", w.uuid)) - w.session = createSession(w.store) - w.tick(ctx) // Immediately tick once to initialize configs. wg.Done() @@ -864,7 +860,6 @@ func (w *GCWorker) checkLeader() (bool, error) { if err != nil { return false, errors.Trace(err) } - w.session = se leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") @@ -895,6 +890,7 @@ func (w *GCWorker) checkLeader() (bool, error) { } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { + se.RollbackTxn(ctx) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) { @@ -998,8 +994,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() + se := createSession(w.store) + defer se.Close() stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key) - rs, err := w.session.Execute(ctx, stmt) + rs, err := se.Execute(ctx, stmt) if len(rs) > 0 { defer terror.Call(rs[0].Close) } @@ -1028,10 +1026,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, key, value, gcVariableComments[key]) - if w.session == nil { - return errors.New("[saveValueToSysTable session is nil]") - } - _, err := w.session.Execute(context.Background(), stmt) + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(context.Background(), stmt) logutil.Logger(context.Background()).Debug("[gc worker] save kv", zap.String("key", key), zap.String("value", value), @@ -1084,13 +1081,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) { lastFinish: time.Now(), done: make(chan error), } - worker.session, err = session.CreateSession(worker.store) - if err != nil { - logutil.Logger(context.Background()).Error("initialize MockGCWorker session fail", zap.Error(err)) - return nil, errors.Trace(err) - } - privilege.BindPrivilegeManager(worker.session, nil) - worker.session.GetSessionVars().InRestrictedSQL = true return &MockGCWorker{worker: worker}, nil }