Skip to content

Commit

Permalink
ttl: force to kill SQL in scan task when canceling TTL job/task (#56518)
Browse files Browse the repository at this point in the history
close #56511
  • Loading branch information
lcwangchao authored Oct 11, 2024
1 parent f4e820d commit e68c26a
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 3 deletions.
4 changes: 3 additions & 1 deletion pkg/ttl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/ttl/metrics",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/timeutil",
"@com_github_pingcap_errors//:errors",
],
Expand All @@ -29,12 +30,13 @@ go_test(
"sysvar_test.go",
],
flaky = True,
shard_count = 6,
shard_count = 7,
deps = [
":session",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_goleak//:goleak",
Expand Down
8 changes: 8 additions & 0 deletions pkg/ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/timeutil"
)

Expand All @@ -54,6 +55,8 @@ type Session interface {
ResetWithGlobalTimeZone(ctx context.Context) error
// GlobalTimeZone returns the global timezone. It is used to compute expire time for TTL
GlobalTimeZone(ctx context.Context) (*time.Location, error)
// KillStmt kills the current statement execution
KillStmt()
// Close closes the session
Close()
// Now returns the current time in location specified by session var
Expand Down Expand Up @@ -180,6 +183,11 @@ func (s *session) GlobalTimeZone(ctx context.Context) (*time.Location, error) {
return timeutil.ParseTimeZone(str)
}

// KillStmt kills the current statement execution
func (s *session) KillStmt() {
s.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted)
}

// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package session_test
import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,3 +66,28 @@ func TestSessionResetTimeZone(t *testing.T) {
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
}

func TestSessionKill(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
se := session.NewSession(tk.Session(), tk.Session(), nil)
sleepStmt := "select sleep(123)"
wg := util.WaitGroupWrapper{}
wg.Run(func() {
start := time.Now()
for time.Since(start) < 10*time.Second {
time.Sleep(10 * time.Millisecond)
processes := do.InfoSyncer().GetSessionManager().ShowProcessList()
for _, proc := range processes {
if proc.Info == sleepStmt {
se.KillStmt()
return
}
}
}
require.FailNow(t, "wait sleep stmt timeout")
})
// the killed sleep stmt will return "1"
tk.MustQuery(sleepStmt).Check(testkit.Rows("1"))
wg.Wait()
}
42 changes: 40 additions & 2 deletions pkg/ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,45 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
if err != nil {
return t.result(err)
}
defer rawSess.Close()

doScanFinished, setDoScanFinished := context.WithCancel(context.Background())
wg := util.WaitGroupWrapper{}
wg.Run(func() {
select {
case <-taskCtx.Done():
case <-ctx.Done():
case <-doScanFinished.Done():
return
}
logger := logutil.BgLogger().With(
zap.Int64("tableID", t.TableID),
zap.String("table", t.tbl.Name.O),
zap.String("partition", t.tbl.Partition.O),
zap.String("jobID", t.JobID),
zap.Int64("scanID", t.ScanID),
)
logger.Info("kill the running statement in scan task because the task or worker cancelled")
rawSess.KillStmt()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
// Have a small probability that the kill signal will be lost when the session is idle.
// So wait for a while and send the kill signal again if the scan is still running.
select {
case <-doScanFinished.Done():
return
case <-ticker.C:
logger.Warn("scan task is still running after the kill signal sent, kill it again")
rawSess.KillStmt()
}
}
})

defer func() {
setDoScanFinished()
wg.Wait()
rawSess.Close()
}()

safeExpire, err := t.tbl.EvalExpireTime(taskCtx, rawSess, rawSess.Now())
if err != nil {
Expand Down Expand Up @@ -182,7 +220,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
selectInterval := time.Since(sqlStart)
if sqlErr != nil {
metrics.SelectErrorDuration.Observe(selectInterval.Seconds())
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil
needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil
logutil.BgLogger().Error("execute query for ttl scan task failed",
zap.String("SQL", sql),
zap.Int("retryTimes", retryTimes),
Expand Down
49 changes: 49 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -446,3 +447,51 @@ func TestScanTaskCheck(t *testing.T) {
require.Equal(t, 1, len(ch))
require.Equal(t, "Total Rows: 1, Success Rows: 0, Error Rows: 0", task.statistics.String())
}

func TestScanTaskCancelStmt(t *testing.T) {
task := &ttlScanTask{
ctx: context.Background(),
tbl: newMockTTLTbl(t, "t1"),
TTLTask: &cache.TTLTask{
ExpireTime: time.UnixMilli(0),
ScanRangeStart: []types.Datum{types.NewIntDatum(0)},
},
statistics: &ttlStatistics{},
}

testCancel := func(ctx context.Context, doCancel func()) {
mockPool := newMockSessionPool(t)
startExec := make(chan struct{})
mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo)
mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) {
close(startExec)
select {
case <-mockPool.se.killed:
return nil, errors.New("killed")
case <-time.After(10 * time.Second):
return nil, errors.New("timeout")
}
}
wg := util.WaitGroupWrapper{}
wg.Run(func() {
select {
case <-startExec:
case <-time.After(10 * time.Second):
require.FailNow(t, "timeout")
}
doCancel()
})
r := task.doScan(ctx, nil, mockPool)
require.NotNil(t, r)
require.EqualError(t, r.err, "killed")
wg.Wait()
}

// test cancel with input context
ctx, cancel := context.WithCancel(context.Background())
testCancel(ctx, cancel)

// test cancel with task context
task.ctx, cancel = context.WithCancel(context.Background())
testCancel(context.Background(), cancel)
}
7 changes: 7 additions & 0 deletions pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type mockSession struct {
resetTimeZoneCalls int
closed bool
commitErr error
killed chan struct{}
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand All @@ -168,6 +169,7 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
t: t,
sessionInfoSchema: newMockInfoSchema(tbls...),
sessionVars: sessVars,
killed: make(chan struct{}),
}
}

Expand Down Expand Up @@ -224,6 +226,11 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error)
return time.Local, nil
}

// KillStmt kills the current statement execution
func (s *mockSession) KillStmt() {
close(s.killed)
}

func (s *mockSession) Close() {
s.closed = true
}
Expand Down

0 comments on commit e68c26a

Please sign in to comment.