Skip to content

Commit

Permalink
store: fix potential panic in GC worker (#14403)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored and sre-bot committed Jan 10, 2020
1 parent 2dcc032 commit 27c7264
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
32 changes: 12 additions & 20 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type GCWorker struct {
lastFinish time.Time
cancel context.CancelFunc
done chan error

session session.Session
}

// NewGCWorker creates a GCWorker instance.
Expand Down Expand Up @@ -155,8 +153,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
logutil.Logger(ctx).Info("[gc worker] start",
zap.String("uuid", w.uuid))

w.session = createSession(w.store)

w.tick(ctx) // Immediately tick once to initialize configs.
wg.Done()

Expand Down Expand Up @@ -274,18 +270,20 @@ func (w *GCWorker) prepare() (bool, uint64, error) {
// 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round.
// Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC.
ctx := context.Background()
_, err := w.session.Execute(ctx, "BEGIN")
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(ctx, "BEGIN")
if err != nil {
return false, 0, errors.Trace(err)
}
doGC, safePoint, err := w.checkPrepare(ctx)
if doGC {
err = w.session.CommitTxn(ctx)
err = se.CommitTxn(ctx)
if err != nil {
return false, 0, errors.Trace(err)
}
} else {
w.session.RollbackTxn(ctx)
se.RollbackTxn(ctx)
}
return doGC, safePoint, errors.Trace(err)
}
Expand Down Expand Up @@ -1075,7 +1073,6 @@ func (w *GCWorker) checkLeader() (bool, error) {
if err != nil {
return false, errors.Trace(err)
}
w.session = se
leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey)
if err != nil {
se.RollbackTxn(ctx)
Expand Down Expand Up @@ -1103,6 +1100,7 @@ func (w *GCWorker) checkLeader() (bool, error) {
}
lease, err := w.loadTime(gcLeaderLeaseKey)
if err != nil {
se.RollbackTxn(ctx)
return false, errors.Trace(err)
}
if lease == nil || lease.Before(time.Now()) {
Expand Down Expand Up @@ -1202,8 +1200,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time

func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
ctx := context.Background()
se := createSession(w.store)
defer se.Close()
stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key)
rs, err := w.session.Execute(ctx, stmt)
rs, err := se.Execute(ctx, stmt)
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
Expand Down Expand Up @@ -1232,10 +1232,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`,
key, value, gcVariableComments[key])
if w.session == nil {
return errors.New("[saveValueToSysTable session is nil]")
}
_, err := w.session.Execute(context.Background(), stmt)
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(context.Background(), stmt)
logutil.BgLogger().Debug("[gc worker] save kv",
zap.String("key", key),
zap.String("value", value),
Expand Down Expand Up @@ -1326,13 +1325,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) {
lastFinish: time.Now(),
done: make(chan error),
}
worker.session, err = session.CreateSession(worker.store)
if err != nil {
logutil.BgLogger().Error("initialize MockGCWorker session fail", zap.Error(err))
return nil, errors.Trace(err)
}
privilege.BindPrivilegeManager(worker.session, nil)
worker.session.GetSessionVars().InRestrictedSQL = true
return &MockGCWorker{worker: worker}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ const (

func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
// Put some delete range tasks.
_, err := s.gcWorker.session.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES
se := createSession(s.gcWorker.store)
defer se.Close()
_, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES
("1", "2", "31", "32", "10"),
("3", "4", "33", "34", "10"),
("5", "6", "35", "36", "10")`)
Expand Down Expand Up @@ -473,7 +475,6 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
}

// Check the delete range tasks.
se := createSession(s.gcWorker.store)
preparedRanges, err := util.LoadDeleteRanges(se, 20)
se.Close()
c.Assert(err, IsNil)
Expand Down

0 comments on commit 27c7264

Please sign in to comment.