From e68c26a0e67fd86a305774e57f4b862f1cf8b1d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 11 Oct 2024 14:41:12 +0800 Subject: [PATCH] ttl: force to kill SQL in scan task when canceling TTL job/task (#56518) close pingcap/tidb#56511 --- pkg/ttl/session/BUILD.bazel | 4 ++- pkg/ttl/session/session.go | 8 +++++ pkg/ttl/session/session_test.go | 27 +++++++++++++++++ pkg/ttl/ttlworker/scan.go | 42 ++++++++++++++++++++++++-- pkg/ttl/ttlworker/scan_test.go | 49 +++++++++++++++++++++++++++++++ pkg/ttl/ttlworker/session_test.go | 7 +++++ 6 files changed, 134 insertions(+), 3 deletions(-) diff --git a/pkg/ttl/session/BUILD.bazel b/pkg/ttl/session/BUILD.bazel index b7abb2e2b6da6..2aa1fb4a2d91e 100644 --- a/pkg/ttl/session/BUILD.bazel +++ b/pkg/ttl/session/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/ttl/metrics", "//pkg/util/chunk", "//pkg/util/sqlexec", + "//pkg/util/sqlkiller", "//pkg/util/timeutil", "@com_github_pingcap_errors//:errors", ], @@ -29,12 +30,13 @@ go_test( "sysvar_test.go", ], flaky = True, - shard_count = 6, + shard_count = 7, deps = [ ":session", "//pkg/sessionctx/variable", "//pkg/testkit", "//pkg/testkit/testsetup", + "//pkg/util", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@org_uber_go_goleak//:goleak", diff --git a/pkg/ttl/session/session.go b/pkg/ttl/session/session.go index c7e74a3b2075a..8a9ea06adba64 100644 --- a/pkg/ttl/session/session.go +++ b/pkg/ttl/session/session.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/ttl/metrics" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/pingcap/tidb/pkg/util/timeutil" ) @@ -54,6 +55,8 @@ type Session interface { ResetWithGlobalTimeZone(ctx context.Context) error // GlobalTimeZone returns the global timezone. It is used to compute expire time for TTL GlobalTimeZone(ctx context.Context) (*time.Location, error) + // KillStmt kills the current statement execution + KillStmt() // Close closes the session Close() // Now returns the current time in location specified by session var @@ -180,6 +183,11 @@ func (s *session) GlobalTimeZone(ctx context.Context) (*time.Location, error) { return timeutil.ParseTimeZone(str) } +// KillStmt kills the current statement execution +func (s *session) KillStmt() { + s.GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.QueryInterrupted) +} + // Close closes the session func (s *session) Close() { if s.closeFn != nil { diff --git a/pkg/ttl/session/session_test.go b/pkg/ttl/session/session_test.go index 0acd9fae836ca..ab68014be3d7f 100644 --- a/pkg/ttl/session/session_test.go +++ b/pkg/ttl/session/session_test.go @@ -17,10 +17,12 @@ package session_test import ( "context" "testing" + "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/ttl/session" + "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" ) @@ -64,3 +66,28 @@ func TestSessionResetTimeZone(t *testing.T) { require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO())) tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC")) } + +func TestSessionKill(t *testing.T) { + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + se := session.NewSession(tk.Session(), tk.Session(), nil) + sleepStmt := "select sleep(123)" + wg := util.WaitGroupWrapper{} + wg.Run(func() { + start := time.Now() + for time.Since(start) < 10*time.Second { + time.Sleep(10 * time.Millisecond) + processes := do.InfoSyncer().GetSessionManager().ShowProcessList() + for _, proc := range processes { + if proc.Info == sleepStmt { + se.KillStmt() + return + } + } + } + require.FailNow(t, "wait sleep stmt timeout") + }) + // the killed sleep stmt will return "1" + tk.MustQuery(sleepStmt).Check(testkit.Rows("1")) + wg.Wait() +} diff --git a/pkg/ttl/ttlworker/scan.go b/pkg/ttl/ttlworker/scan.go index d1930685f053d..9a01009e2f15b 100644 --- a/pkg/ttl/ttlworker/scan.go +++ b/pkg/ttl/ttlworker/scan.go @@ -111,7 +111,45 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s if err != nil { return t.result(err) } - defer rawSess.Close() + + doScanFinished, setDoScanFinished := context.WithCancel(context.Background()) + wg := util.WaitGroupWrapper{} + wg.Run(func() { + select { + case <-taskCtx.Done(): + case <-ctx.Done(): + case <-doScanFinished.Done(): + return + } + logger := logutil.BgLogger().With( + zap.Int64("tableID", t.TableID), + zap.String("table", t.tbl.Name.O), + zap.String("partition", t.tbl.Partition.O), + zap.String("jobID", t.JobID), + zap.Int64("scanID", t.ScanID), + ) + logger.Info("kill the running statement in scan task because the task or worker cancelled") + rawSess.KillStmt() + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + // Have a small probability that the kill signal will be lost when the session is idle. + // So wait for a while and send the kill signal again if the scan is still running. + select { + case <-doScanFinished.Done(): + return + case <-ticker.C: + logger.Warn("scan task is still running after the kill signal sent, kill it again") + rawSess.KillStmt() + } + } + }) + + defer func() { + setDoScanFinished() + wg.Wait() + rawSess.Close() + }() safeExpire, err := t.tbl.EvalExpireTime(taskCtx, rawSess, rawSess.Now()) if err != nil { @@ -182,7 +220,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s selectInterval := time.Since(sqlStart) if sqlErr != nil { metrics.SelectErrorDuration.Observe(selectInterval.Seconds()) - needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil + needRetry := retryable && retryTimes < scanTaskExecuteSQLMaxRetry && ctx.Err() == nil && t.ctx.Err() == nil logutil.BgLogger().Error("execute query for ttl scan task failed", zap.String("SQL", sql), zap.Int("retryTimes", retryTimes), diff --git a/pkg/ttl/ttlworker/scan_test.go b/pkg/ttl/ttlworker/scan_test.go index 137e72fe9a662..461c6ae5f28af 100644 --- a/pkg/ttl/ttlworker/scan_test.go +++ b/pkg/ttl/ttlworker/scan_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/ttl/cache" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" ) @@ -446,3 +447,51 @@ func TestScanTaskCheck(t *testing.T) { require.Equal(t, 1, len(ch)) require.Equal(t, "Total Rows: 1, Success Rows: 0, Error Rows: 0", task.statistics.String()) } + +func TestScanTaskCancelStmt(t *testing.T) { + task := &ttlScanTask{ + ctx: context.Background(), + tbl: newMockTTLTbl(t, "t1"), + TTLTask: &cache.TTLTask{ + ExpireTime: time.UnixMilli(0), + ScanRangeStart: []types.Datum{types.NewIntDatum(0)}, + }, + statistics: &ttlStatistics{}, + } + + testCancel := func(ctx context.Context, doCancel func()) { + mockPool := newMockSessionPool(t) + startExec := make(chan struct{}) + mockPool.se.sessionInfoSchema = newMockInfoSchema(task.tbl.TableInfo) + mockPool.se.executeSQL = func(_ context.Context, _ string, _ ...any) ([]chunk.Row, error) { + close(startExec) + select { + case <-mockPool.se.killed: + return nil, errors.New("killed") + case <-time.After(10 * time.Second): + return nil, errors.New("timeout") + } + } + wg := util.WaitGroupWrapper{} + wg.Run(func() { + select { + case <-startExec: + case <-time.After(10 * time.Second): + require.FailNow(t, "timeout") + } + doCancel() + }) + r := task.doScan(ctx, nil, mockPool) + require.NotNil(t, r) + require.EqualError(t, r.err, "killed") + wg.Wait() + } + + // test cancel with input context + ctx, cancel := context.WithCancel(context.Background()) + testCancel(ctx, cancel) + + // test cancel with task context + task.ctx, cancel = context.WithCancel(context.Background()) + testCancel(context.Background(), cancel) +} diff --git a/pkg/ttl/ttlworker/session_test.go b/pkg/ttl/ttlworker/session_test.go index b9022802b1814..55ed1022a49f8 100644 --- a/pkg/ttl/ttlworker/session_test.go +++ b/pkg/ttl/ttlworker/session_test.go @@ -155,6 +155,7 @@ type mockSession struct { resetTimeZoneCalls int closed bool commitErr error + killed chan struct{} } func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { @@ -168,6 +169,7 @@ func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession { t: t, sessionInfoSchema: newMockInfoSchema(tbls...), sessionVars: sessVars, + killed: make(chan struct{}), } } @@ -224,6 +226,11 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error) return time.Local, nil } +// KillStmt kills the current statement execution +func (s *mockSession) KillStmt() { + close(s.killed) +} + func (s *mockSession) Close() { s.closed = true }