From dfe5d0ba3aa3df61d8ba8eb6a76dd642cadaa767 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 9 Jan 2020 11:31:05 +0800 Subject: [PATCH 1/3] store: fix potential panic in GC worker Signed-off-by: Shuaipeng Yu --- store/tikv/gcworker/gc_worker.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 11e4231074186..85f516d222668 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -1067,42 +1067,39 @@ func (w *GCWorker) doGC(ctx context.Context, safePoint uint64, concurrency int) func (w *GCWorker) checkLeader() (bool, error) { metrics.GCWorkerCounter.WithLabelValues("check_leader").Inc() - se := createSession(w.store) - defer se.Close() - ctx := context.Background() - _, err := se.Execute(ctx, "BEGIN") + _, err := w.session.Execute(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) } - w.session = se leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } logutil.BgLogger().Debug("[gc worker] got leader", zap.String("uuid", leader)) if leader == w.uuid { err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } - err = se.CommitTxn(ctx) + err = w.session.CommitTxn(ctx) if err != nil { return false, errors.Trace(err) } return true, nil } - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) - _, err = se.Execute(ctx, "BEGIN") + _, err = w.session.Execute(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) { @@ -1112,26 +1109,26 @@ func (w *GCWorker) checkLeader() (bool, error) { err = w.saveValueToSysTable(gcLeaderUUIDKey, w.uuid) if err != nil { - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } err = w.saveValueToSysTable(gcLeaderDescKey, w.desc) if err != nil { - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, errors.Trace(err) } - err = se.CommitTxn(ctx) + err = w.session.CommitTxn(ctx) if err != nil { return false, errors.Trace(err) } return true, nil } - se.RollbackTxn(ctx) + w.session.RollbackTxn(ctx) return false, nil } From 29b1e92382c616ac6a76ad2df74978fa32473d88 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 9 Jan 2020 20:11:36 +0800 Subject: [PATCH 2/3] use function local session instead of object one Signed-off-by: Shuaipeng Yu --- store/tikv/gcworker/gc_worker.go | 58 +++++++++++++-------------- store/tikv/gcworker/gc_worker_test.go | 5 ++- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 85f516d222668..7d183c0f41864 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -55,8 +55,6 @@ type GCWorker struct { lastFinish time.Time cancel context.CancelFunc done chan error - - session session.Session } // NewGCWorker creates a GCWorker instance. @@ -155,7 +153,8 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { logutil.Logger(ctx).Info("[gc worker] start", zap.String("uuid", w.uuid)) - w.session = createSession(w.store) + se := createSession(w.store) + defer se.Close() w.tick(ctx) // Immediately tick once to initialize configs. wg.Done() @@ -274,18 +273,20 @@ func (w *GCWorker) prepare() (bool, uint64, error) { // 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round. // Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC. ctx := context.Background() - _, err := w.session.Execute(ctx, "BEGIN") + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(ctx, "BEGIN") if err != nil { return false, 0, errors.Trace(err) } doGC, safePoint, err := w.checkPrepare(ctx) if doGC { - err = w.session.CommitTxn(ctx) + err = se.CommitTxn(ctx) if err != nil { return false, 0, errors.Trace(err) } } else { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) } return doGC, safePoint, errors.Trace(err) } @@ -1067,39 +1068,42 @@ func (w *GCWorker) doGC(ctx context.Context, safePoint uint64, concurrency int) func (w *GCWorker) checkLeader() (bool, error) { metrics.GCWorkerCounter.WithLabelValues("check_leader").Inc() + se := createSession(w.store) + defer se.Close() + ctx := context.Background() - _, err := w.session.Execute(ctx, "BEGIN") + _, err := se.Execute(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) } leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } logutil.BgLogger().Debug("[gc worker] got leader", zap.String("uuid", leader)) if leader == w.uuid { err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } - err = w.session.CommitTxn(ctx) + err = se.CommitTxn(ctx) if err != nil { return false, errors.Trace(err) } return true, nil } - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) - _, err = w.session.Execute(ctx, "BEGIN") + _, err = se.Execute(ctx, "BEGIN") if err != nil { return false, errors.Trace(err) } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) { @@ -1109,26 +1113,26 @@ func (w *GCWorker) checkLeader() (bool, error) { err = w.saveValueToSysTable(gcLeaderUUIDKey, w.uuid) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } err = w.saveValueToSysTable(gcLeaderDescKey, w.desc) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } err = w.saveTime(gcLeaderLeaseKey, time.Now().Add(gcWorkerLease)) if err != nil { - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, errors.Trace(err) } - err = w.session.CommitTxn(ctx) + err = se.CommitTxn(ctx) if err != nil { return false, errors.Trace(err) } return true, nil } - w.session.RollbackTxn(ctx) + se.RollbackTxn(ctx) return false, nil } @@ -1199,8 +1203,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() + se := createSession(w.store) + defer se.Close() stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key) - rs, err := w.session.Execute(ctx, stmt) + rs, err := se.Execute(ctx, stmt) if len(rs) > 0 { defer terror.Call(rs[0].Close) } @@ -1229,10 +1235,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, key, value, gcVariableComments[key]) - if w.session == nil { - return errors.New("[saveValueToSysTable session is nil]") - } - _, err := w.session.Execute(context.Background(), stmt) + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(context.Background(), stmt) logutil.BgLogger().Debug("[gc worker] save kv", zap.String("key", key), zap.String("value", value), @@ -1323,13 +1328,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) { lastFinish: time.Now(), done: make(chan error), } - worker.session, err = session.CreateSession(worker.store) - if err != nil { - logutil.BgLogger().Error("initialize MockGCWorker session fail", zap.Error(err)) - return nil, errors.Trace(err) - } - privilege.BindPrivilegeManager(worker.session, nil) - worker.session.GetSessionVars().InRestrictedSQL = true return &MockGCWorker{worker: worker}, nil } diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 747e8caf13238..480650c29f204 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -445,7 +445,9 @@ const ( func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) { // Put some delete range tasks. - _, err := s.gcWorker.session.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES + se := createSession(s.gcWorker.store) + defer se.Close() + _, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES ("1", "2", "31", "32", "10"), ("3", "4", "33", "34", "10"), ("5", "6", "35", "36", "10")`) @@ -473,7 +475,6 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) { } // Check the delete range tasks. - se := createSession(s.gcWorker.store) preparedRanges, err := util.LoadDeleteRanges(se, 20) se.Close() c.Assert(err, IsNil) From 6f976d6c2482b2d711f2e5de33b1ca416b38f19a Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 10 Jan 2020 14:48:50 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: Shuaipeng Yu --- store/tikv/gcworker/gc_worker.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 7d183c0f41864..349c374b80e86 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -153,9 +153,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { logutil.Logger(ctx).Info("[gc worker] start", zap.String("uuid", w.uuid)) - se := createSession(w.store) - defer se.Close() - w.tick(ctx) // Immediately tick once to initialize configs. wg.Done()