Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-2810c1d55ddb
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Jun 14, 2022
2 parents 26aecf1 + 70ac9ef commit 3176d23
Show file tree
Hide file tree
Showing 16 changed files with 387 additions and 39 deletions.
32 changes: 27 additions & 5 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,15 +1034,18 @@ func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}
type singleMgrBuilder struct {
taskID int64
}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
pd: pd,
taskID: b.taskID,
}
}

Expand All @@ -1051,15 +1054,34 @@ func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
pd *pdutil.PdController
taskID int64
initialized bool
sourceBytes uint64
clusterAvail uint64
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
m.sourceBytes = uint64(source)
m.initialized = true
return nil
}

func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
_, err := action(nil)
newTasks, err := action([]taskMeta{
{
taskID: m.taskID,
status: taskMetaStatusInitial,
sourceBytes: m.sourceBytes,
clusterAvail: m.clusterAvail,
},
})
for _, t := range newTasks {
if m.taskID == t.taskID {
m.sourceBytes = t.sourceBytes
m.clusterAvail = t.clusterAvail
}
}
return err
}

Expand All @@ -1068,7 +1090,7 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
return m.initialized, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql/driver"
"sort"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -323,3 +324,28 @@ func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) {
c.Assert(err, IsNil)

}

func (s *taskMetaMgrSuite) TestSingleTaskMetaMgr(c *C) {
metaBuilder := singleMgrBuilder{
taskID: time.Now().UnixNano(),
}
metaMgr := metaBuilder.TaskMetaMgr(nil)

ok, err := metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)

err = metaMgr.InitTask(context.Background(), 1<<30)
c.Assert(err, IsNil)

ok, err = metaMgr.CheckTaskExist(context.Background())
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)

err = metaMgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
c.Assert(len(tasks), Equals, 1)
c.Assert(tasks[0].sourceBytes, Equals, uint64(1<<30))
return nil, nil
})
c.Assert(err, IsNil)
}
18 changes: 16 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ func NewRestoreControllerWithPauser(
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
metaBuilder = singleMgrBuilder{
taskID: cfg.TaskID,
}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1893,7 +1895,19 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err = rc.taskMgr.InitTask(ctx, source); err != nil {
return errors.Trace(err)
}
if rc.cfg.App.CheckRequirements {
}
if rc.cfg.App.CheckRequirements {
needCheck := true
if rc.cfg.Checkpoint.Enable {
taskCheckpoints, err := rc.checkpointsDB.TaskCheckpoint(ctx)
if err != nil {
return errors.Trace(err)
}
// If task checkpoint is initialized, it means check has been performed before.
// We don't need and shouldn't check again, because lightning may have already imported some data.
needCheck = taskCheckpoints == nil
}
if needCheck {
err = rc.localResource(source)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1671,13 +1671,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
}

case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
memTracker := memory.NewTracker(v.ID(), -1)
memTracker.AttachTo(b.ctx.GetSessionVars().StmtCtx.MemTracker)
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
extractor: v.Extractor.(*plannercore.SlowQueryExtractor),
memTracker: memTracker,
},
}
case strings.ToLower(infoschema.TableStorageStats):
Expand Down
64 changes: 53 additions & 11 deletions executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ type slowQueryRetriever struct {
checker *slowLogChecker
columnValueFactoryMap map[string]slowQueryColumnValueFactory

taskList chan slowLogTask
stats *slowQueryRuntimeStats
taskList chan slowLogTask
stats *slowQueryRuntimeStats
memTracker *memory.Tracker
lastFetchSize int64
cancel context.CancelFunc
wg sync.WaitGroup
}

func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
Expand All @@ -77,6 +81,7 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte
if err != nil {
return nil, err
}
ctx, e.cancel = context.WithCancel(ctx)
e.initializeAsyncParsing(ctx, sctx)
}
return e.dataForSlowLog(ctx, sctx)
Expand Down Expand Up @@ -142,6 +147,10 @@ func (e *slowQueryRetriever) close() error {
logutil.BgLogger().Error("close slow log file failed.", zap.Error(err))
}
}
if e.cancel != nil {
e.cancel()
}
e.wg.Wait()
return nil
}

Expand Down Expand Up @@ -184,6 +193,7 @@ func (e *slowQueryRetriever) getPreviousFile() *os.File {
}

func (e *slowQueryRetriever) parseDataForSlowLog(ctx context.Context, sctx sessionctx.Context) {
defer e.wg.Done()
file := e.getNextFile()
if file == nil {
close(e.taskList)
Expand All @@ -198,6 +208,8 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context, sctx sessionctx
task slowLogTask
ok bool
)
e.memConsume(-e.lastFetchSize)
e.lastFetchSize = 0
for {
select {
case task, ok = <-e.taskList:
Expand All @@ -215,6 +227,7 @@ func (e *slowQueryRetriever) dataForSlowLog(ctx context.Context, sctx sessionctx
if len(rows) == 0 {
continue
}
e.lastFetchSize = calculateDatumsSize(rows)
return rows, nil
}
}
Expand Down Expand Up @@ -413,7 +426,6 @@ func decomposeToSlowLogTasks(logs []slowLogBlock, num int) [][]string {

func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.Context, reader *bufio.Reader, logNum int) {
defer close(e.taskList)
var wg sync.WaitGroup
offset := offset{offset: 0, length: 0}
// To limit the num of go routine
concurrent := sctx.GetSessionVars().Concurrency.DistSQLScanConcurrency()
Expand All @@ -439,7 +451,7 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
return
case e.taskList <- t:
}
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{nil, err})
e.sendParsedSlowLogCh(t, parsedSlowLog{nil, err})
}
if len(logs) == 0 || len(logs[0]) == 0 {
break
Expand All @@ -452,13 +464,17 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
t := slowLogTask{}
t.resultCh = make(chan parsedSlowLog, 1)
start := offset
wg.Add(1)
ch <- 1
e.taskList <- t
select {
case <-ctx.Done():
return
case e.taskList <- t:
}
e.wg.Add(1)
go func() {
defer wg.Done()
defer e.wg.Done()
result, err := e.parseLog(ctx, sctx, log, start)
e.sendParsedSlowLogCh(ctx, t, parsedSlowLog{result, err})
e.sendParsedSlowLogCh(t, parsedSlowLog{result, err})
<-ch
}()
offset.offset = e.fileLine
Expand All @@ -470,13 +486,12 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C
}
}
}
wg.Wait()
}

func (e *slowQueryRetriever) sendParsedSlowLogCh(ctx context.Context, t slowLogTask, re parsedSlowLog) {
func (e *slowQueryRetriever) sendParsedSlowLogCh(t slowLogTask, re parsedSlowLog) {
select {
case t.resultCh <- re:
case <-ctx.Done():
default:
return
}
}
Expand Down Expand Up @@ -523,6 +538,8 @@ func splitByColon(line string) (fields []string, values []string) {

func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) {
start := time.Now()
logSize := calculateLogSize(log)
defer e.memConsume(-logSize)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
Expand All @@ -535,6 +552,7 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
atomic.AddInt64(&e.stats.parseLog, int64(time.Since(start)))
}
}()
e.memConsume(logSize)
failpoint.Inject("errorMockParseSlowLogPanic", func(val failpoint.Value) {
if val.(bool) {
panic("panic test")
Expand Down Expand Up @@ -611,6 +629,7 @@ func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Conte
// Get the sql string, and mark the start flag to false.
_ = e.setColumnValue(sctx, row, tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), e.checker, fileLine)
e.setDefaultValue(row)
e.memConsume(types.EstimatedMemUsage(row, 1))
data = append(data, row)
startFlag = false
} else {
Expand Down Expand Up @@ -1077,5 +1096,28 @@ func readLastLines(ctx context.Context, file *os.File, endCursor int64) ([]strin

func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) {
e.taskList = make(chan slowLogTask, 1)
e.wg.Add(1)
go e.parseDataForSlowLog(ctx, sctx)
}

func calculateLogSize(log []string) int64 {
size := 0
for _, line := range log {
size += len(line)
}
return int64(size)
}

func calculateDatumsSize(rows [][]types.Datum) int64 {
size := int64(0)
for _, row := range rows {
size += types.EstimatedMemUsage(row, 1)
}
return size
}

func (e *slowQueryRetriever) memConsume(bytes int64) {
if e.memTracker != nil {
e.memTracker.Consume(bytes)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211029104011-2fd3841894de
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211221095044-38d30d5632a8
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211029104011-2fd3841894de h1:DKo2grkDpP9hQHuYbkAz4yxMS1742qBkUd4kwyZK2As=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211029104011-2fd3841894de/go.mod h1:gdd4S4uS3/apOF9iet/DIYUdr6J4WzGLWyDgn6SMtg0=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211221095044-38d30d5632a8 h1:dzXPACk+j3i6uih8chDeW9VkVHjEkJyK8eTic+UBhWQ=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211221095044-38d30d5632a8/go.mod h1:gdd4S4uS3/apOF9iet/DIYUdr6J4WzGLWyDgn6SMtg0=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 h1:nFm1jQDz1iRktoyV2SyM5zVk6+PJHQNunJZ7ZJcqzAo=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379/go.mod h1:y+09hAUXJbrd4c0nktL74zXDDuD7atGtfOKxL90PCOE=
Expand Down
Loading

0 comments on commit 3176d23

Please sign in to comment.