Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: fix potential panic in GC worker #14403

Merged
merged 5 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type GCWorker struct {
lastFinish time.Time
cancel context.CancelFunc
done chan error

session session.Session
}

// NewGCWorker creates a GCWorker instance.
Expand Down Expand Up @@ -155,8 +153,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
logutil.Logger(ctx).Info("[gc worker] start",
zap.String("uuid", w.uuid))

w.session = createSession(w.store)

w.tick(ctx) // Immediately tick once to initialize configs.
wg.Done()

Expand Down Expand Up @@ -274,18 +270,20 @@ func (w *GCWorker) prepare() (bool, uint64, error) {
// 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round.
// Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC.
ctx := context.Background()
_, err := w.session.Execute(ctx, "BEGIN")
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(ctx, "BEGIN")
if err != nil {
return false, 0, errors.Trace(err)
}
doGC, safePoint, err := w.checkPrepare(ctx)
if doGC {
err = w.session.CommitTxn(ctx)
err = se.CommitTxn(ctx)
if err != nil {
return false, 0, errors.Trace(err)
}
} else {
w.session.RollbackTxn(ctx)
se.RollbackTxn(ctx)
}
return doGC, safePoint, errors.Trace(err)
}
Expand Down Expand Up @@ -1075,7 +1073,6 @@ func (w *GCWorker) checkLeader() (bool, error) {
if err != nil {
return false, errors.Trace(err)
}
w.session = se
leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey)
if err != nil {
se.RollbackTxn(ctx)
Expand Down Expand Up @@ -1103,6 +1100,7 @@ func (w *GCWorker) checkLeader() (bool, error) {
}
lease, err := w.loadTime(gcLeaderLeaseKey)
if err != nil {
se.RollbackTxn(ctx)
return false, errors.Trace(err)
}
if lease == nil || lease.Before(time.Now()) {
Expand Down Expand Up @@ -1202,8 +1200,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time

func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
ctx := context.Background()
se := createSession(w.store)
defer se.Close()
stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key)
rs, err := w.session.Execute(ctx, stmt)
rs, err := se.Execute(ctx, stmt)
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
Expand Down Expand Up @@ -1232,10 +1232,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`,
key, value, gcVariableComments[key])
if w.session == nil {
return errors.New("[saveValueToSysTable session is nil]")
}
_, err := w.session.Execute(context.Background(), stmt)
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(context.Background(), stmt)
logutil.BgLogger().Debug("[gc worker] save kv",
zap.String("key", key),
zap.String("value", value),
Expand Down Expand Up @@ -1326,13 +1325,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) {
lastFinish: time.Now(),
done: make(chan error),
}
worker.session, err = session.CreateSession(worker.store)
if err != nil {
logutil.BgLogger().Error("initialize MockGCWorker session fail", zap.Error(err))
return nil, errors.Trace(err)
}
privilege.BindPrivilegeManager(worker.session, nil)
worker.session.GetSessionVars().InRestrictedSQL = true
return &MockGCWorker{worker: worker}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ const (

func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
// Put some delete range tasks.
_, err := s.gcWorker.session.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES
se := createSession(s.gcWorker.store)
defer se.Close()
_, err := se.Execute(context.Background(), `INSERT INTO mysql.gc_delete_range VALUES
("1", "2", "31", "32", "10"),
("3", "4", "33", "34", "10"),
("5", "6", "35", "36", "10")`)
Expand Down Expand Up @@ -473,7 +475,6 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
}

// Check the delete range tasks.
se := createSession(s.gcWorker.store)
preparedRanges, err := util.LoadDeleteRanges(se, 20)
se.Close()
c.Assert(err, IsNil)
Expand Down