From 27c726495e32612213d503a1a5b6cb25cd39a4d8 Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Fri, 10 Jan 2020 15:24:43 +0800 Subject: [PATCH] store: fix potential panic in GC worker (#14403) --- store/tikv/gcworker/gc_worker.go | 32 ++++++++++----------------- store/tikv/gcworker/gc_worker_test.go | 5 +++-- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 11e4231074186..349c374b80e86 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -55,8 +55,6 @@ type GCWorker struct { lastFinish time.Time cancel context.CancelFunc done chan error - - session session.Session } // NewGCWorker creates a GCWorker instance. @@ -155,8 +153,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { logutil.Logger(ctx).Info("[gc worker] start", zap.String("uuid", w.uuid)) - w.session = createSession(w.store) - w.tick(ctx) // Immediately tick once to initialize configs. wg.Done() @@ -274,18 +270,20 @@ func (w *GCWorker) prepare() (bool, uint64, error) { // 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round. // Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC. ctx := context.Background() - _, err := w.session.Execute(ctx, "BEGIN") + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(ctx, "BEGIN") if err != nil { return false, 0, errors.Trace(err) } doGC, safePoint, err := w.checkPrepare(ctx) if doGC { - err = w.session.CommitTxn(ctx) + err = se.CommitTxn(ctx) if err != nil { return false, 0, errors.Trace(err) } } else { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) } return doGC, safePoint, errors.Trace(err) } @@ -1075,7 +1073,6 @@ func (w *GCWorker) checkLeader() (bool, error) { if err != nil { return false, errors.Trace(err) } - w.session = se leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { se.RollbackTxn(ctx) @@ -1103,6 +1100,7 @@ func (w *GCWorker) checkLeader() (bool, error) { } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { + se.RollbackTxn(ctx) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) { @@ -1202,8 +1200,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() + se := createSession(w.store) + defer se.Close() stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key) - rs, err := w.session.Execute(ctx, stmt) + rs, err := se.Execute(ctx, stmt) if len(rs) > 0 { defer terror.Call(rs[0].Close) } @@ -1232,10 +1232,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, key, value, gcVariableComments[key]) - if w.session == nil { - return errors.New("[saveValueToSysTable session is nil]") - } - _, err := w.session.Execute(context.Background(), stmt) + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(context.Background(), stmt) logutil.BgLogger().Debug("[gc worker] save kv", zap.String("key", key), zap.String("value", value), @@ -1326,13 +1325,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) { lastFinish: time.Now(), done: make(chan error), } - worker.session, err = session.CreateSession(worker.store) - if err != nil { - logutil.BgLogger().Error("initialize MockGCWorker session fail", zap.Error(err)) - return nil, errors.Trace(err) - } - privilege.BindPrivilegeManager(worker.session, nil) - worker.session.GetSessionVars().InRestrictedSQL = true return &MockGCWorker{worker: worker}, nil } diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 747e8caf13238..480650c29f204 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -445,7 +445,9 @@ const ( func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) { // Put some delete range tasks. - _, err := s.gcWorker.session.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES + se := createSession(s.gcWorker.store) + defer se.Close() + _, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES ("1", "2", "31", "32", "10"), ("3", "4", "33", "34", "10"), ("5", "6", "35", "36", "10")`) @@ -473,7 +475,6 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) { } // Check the delete range tasks. - se := createSession(s.gcWorker.store) preparedRanges, err := util.LoadDeleteRanges(se, 20) se.Close() c.Assert(err, IsNil)