Skip to content

Commit

Permalink
ddl: support read records with copr for adding index (#39191)
Browse files Browse the repository at this point in the history
ref #35983
  • Loading branch information
tangenta authored Nov 18, 2022
1 parent 48e17e2 commit e9253f7
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 2 deletions.
2 changes: 2 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"foreign_key.go",
"generated_column.go",
"index.go",
"index_cop.go",
"index_merge_tmp.go",
"job_table.go",
"mock.go",
Expand Down Expand Up @@ -168,6 +169,7 @@ go_test(
"fail_test.go",
"foreign_key_test.go",
"index_change_test.go",
"index_cop_test.go",
"index_merge_tmp_test.go",
"index_modify_test.go",
"integration_test.go",
Expand Down
12 changes: 12 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ type reorgBackfillTask struct {
endInclude bool
}

func (r *reorgBackfillTask) excludedEndKey() kv.Key {
if r.endInclude {
return r.endKey.Next()
}
return r.endKey
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
Expand Down Expand Up @@ -342,6 +349,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
w.batchCnt = int(variable.GetDDLReorgBatchSize())
result := w.handleBackfillTask(d, task, bf)
w.resultCh <- result
if result.err != nil {
logutil.BgLogger().Info("[ddl] backfill worker exit on error",
zap.Stringer("type", w.tp), zap.Int("workerID", w.id), zap.Error(result.err))
return
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@

package ddl

import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
)

func SetBatchInsertDeleteRangeSize(i int) {
batchInsertDeleteRangeSize = i
}

var (
FetchRowsFromCop4Test = fetchRowsFromCop
NewCopContext4Test = newCopContext
)

type (
IndexRecord4Test = *indexRecord
)

func (i IndexRecord4Test) GetHandle() kv.Handle {
return i.handle
}

func (i IndexRecord4Test) GetIndexValues() []types.Datum {
return i.vals
}
17 changes: 15 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ type baseIndexWorker struct {
type addIndexWorker struct {
baseIndexWorker
index table.Index
copCtx *copContext
writerCtx *ingest.WriterContext

// The following attributes are used to reduce memory allocation.
Expand All @@ -1200,6 +1201,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)

var lwCtx *ingest.WriterContext
var copCtx *copContext
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
Expand All @@ -1213,6 +1215,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
if err != nil {
return nil, err
}
copCtx = newCopContext(t.Meta(), indexInfo, sessCtx)
}

return &addIndexWorker{
Expand All @@ -1227,6 +1230,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
jobContext: jc,
},
index: index,
copCtx: copCtx,
writerCtx: lwCtx,
}, nil
}
Expand Down Expand Up @@ -1484,15 +1488,24 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC

oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

idxRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
var (
idxRecords []*indexRecord
nextKey kv.Key
taskDone bool
)
if w.copCtx != nil {
idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange)
} else {
idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange)
}
if err != nil {
return errors.Trace(err)
}
Expand Down
221 changes: 221 additions & 0 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
)

// copReadBatchFactor is the factor of batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range.
const copReadBatchFactor = 10

func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) {
w.idxRecords = w.idxRecords[:0]
start, end := handleRange.startKey, handleRange.excludedEndKey()
batchCnt := w.batchCnt * copReadBatchFactor
return fetchRowsFromCop(w.ctx, w.copCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt)
}

// fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows.
func fetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) {
srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
var done bool
buf, done, err = copCtx.fetchTableScanResult(ctx, srcResult, buf, batchCnt)
nextKey := endKey
if !done {
lastHandle := buf[len(buf)-1].handle
prefix := tablecodec.GenTableRecordPrefix(copCtx.tblInfo.ID)
nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next()
}
return buf, nextKey, done, err
}

type copContext struct {
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
pkInfo *model.IndexInfo
colInfos []*model.ColumnInfo
fieldTps []*types.FieldType
sessCtx sessionctx.Context

srcChunk *chunk.Chunk
}

func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext {
colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
c := tblInfo.Columns[idxCol.Offset]
if c.IsGenerated() && !c.GeneratedStored {
// TODO(tangenta): support reading virtual generated columns.
return nil
}
colInfos = append(colInfos, c)
fieldTps = append(fieldTps, &c.FieldType)
}

pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo)
colInfos = append(colInfos, pkColInfos...)
fieldTps = append(fieldTps, pkFieldTps...)

copCtx := &copContext{
tblInfo: tblInfo,
idxInfo: idxInfo,
pkInfo: pkInfo,
colInfos: colInfos,
fieldTps: fieldTps,
sessCtx: sessCtx,
srcChunk: chunk.NewChunkWithCapacity(fieldTps, variable.DefMaxChunkSize),
}
return copCtx
}

func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {
dagPB, err := buildDAGPB(c.sessCtx, c.tblInfo, c.colInfos)
if err != nil {
return nil, err
}

var builder distsql.RequestBuilder
kvReq, err := builder.
SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}).
SetKeepOrder(true).
SetFromSessionVars(c.sessCtx.GetSessionVars()).
SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
if err != nil {
return nil, err
}
return distsql.Select(ctx, c.sessCtx, kvReq, c.fieldTps, statistics.NewQueryFeedback(0, nil, 0, false))
}

func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.SelectResult,
buf []*indexRecord, batchCnt int) ([]*indexRecord, bool, error) {
sctx := c.sessCtx.GetSessionVars().StmtCtx
for {
err := result.Next(ctx, c.srcChunk)
if err != nil {
return nil, false, errors.Trace(err)
}
if c.srcChunk.NumRows() == 0 {
return buf, true, nil
}
iter := chunk.NewIterator4Chunk(c.srcChunk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps)
handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx)
if err != nil {
return nil, false, errors.Trace(err)
}
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
if len(buf) >= batchCnt {
return buf, false, nil
}
}
}
}

func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location())
sc := sCtx.GetSessionVars().StmtCtx
dagReq.Flags = sc.PushDownFlags()
for i := range colInfos {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
execPB, err := constructTableScanPB(sCtx, tblInfo, colInfos)
if err != nil {
return nil, err
}
dagReq.Executors = append(dagReq.Executors, execPB)
distsql.SetEncodeType(sCtx, dagReq)
return dagReq, nil
}

func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.Executor, error) {
tblScan := tables.BuildTableScanFromInfos(tblInfo, colInfos)
tblScan.TableId = tblInfo.ID
err := tables.SetPBColumnsDefaultValue(sCtx, tblScan.Columns, colInfos)
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
}

func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) {
if tbInfo.PKIsHandle {
for i := range tbInfo.Columns {
if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) {
return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil
}
}
} else if tbInfo.IsCommonHandle {
primaryIdx := tables.FindPrimaryIndex(tbInfo)
pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns))
pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset])
pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType)
}
return pkCols, pkFts, primaryIdx
}
extra := model.NewExtraHandleColInfo()
return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil
}

func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) {
datumBuf := make([]types.Datum, 0, len(fieldTps))
idxColLen := len(idxInfo.Columns)
for i, ft := range fieldTps {
datumBuf = append(datumBuf, row.GetDatum(i, ft))
}
return datumBuf[:idxColLen], datumBuf[idxColLen:]
}

func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
pkInfo *model.IndexInfo, stmtCtx *stmtctx.StatementContext) (kv.Handle, error) {
if tblInfo.IsCommonHandle {
tablecodec.TruncateIndexValues(tblInfo, pkInfo, pkDts)
handleBytes, err := codec.EncodeKey(stmtCtx, nil, pkDts...)
if err != nil {
return nil, err
}
return kv.NewCommonHandle(handleBytes)
}
return kv.IntHandle(pkDts[0].GetInt64()), nil
}
Loading

0 comments on commit e9253f7

Please sign in to comment.