Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/admin: support admin check table on partition table (#12796) #13143

Merged
merged 5 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,109 @@ func (s *testSuite) TestAdminCheckTableFailed(c *C) {
tk.MustExec("admin check table admin_test")
}

func (s *testSuite) TestAdminCheckPartitionTableFailed(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test_p")
tk.MustExec("set @@tidb_enable_table_partition = 1")
tk.MustExec("create table admin_test_p (c1 int key,c2 int,c3 int,index idx(c2)) partition by range (c1) (" +
"partition p0 values less than (1)," +
"partition p1 values less than (2)," +
"partition p2 values less than (3)," +
"partition p3 values less than (4)," +
"partition p4 values less than (5)," +
"partition p5 values less than (maxvalue))")
tk.MustExec("insert admin_test_p (c1, c2, c3) values (0,0,0), (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustExec("admin check table admin_test_p")

// Make some corrupted index. Build the index information.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test_p")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(tbl, NotNil)
c.Assert(err, IsNil)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
sc := s.ctx.GetSessionVars().StmtCtx
tk.Se.GetSessionVars().IndexLookupSize = 3
tk.Se.GetSessionVars().MaxChunkSize = 3

// Reduce one row of index on partitions.
// Table count > index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err.Error(), Equals, fmt.Sprintf("[executor:8003]admin_test_p err:[admin:1]index:<nil> != record:&admin.RecordData{Handle:%d, Values:[]types.Datum{types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}}}", i, i))
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
// TODO: fix admin recover for partition table.
//r := tk.MustQuery("admin recover index admin_test_p idx")
//r.Check(testkit.Rows("0 0"))
//tk.MustExec("admin check table admin_test_p")
// Manual recover index.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Add one row of index on partitions.
// Table count < index count.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:<nil>", i+8, i+8))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}

// Table count = index count, but the index value was wrong.
for i := 0; i <= 5; i++ {
partitionIdx := i % len(tblInfo.GetPartitionInfo().Definitions)
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test_p")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, fmt.Sprintf("col c2, handle %d, index:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)} != record:types.Datum{k:0x1, collation:0x0, decimal:0x0, length:0x0, i:%d, b:[]uint8(nil), x:interface {}(nil)}", i, i+8, i))
// TODO: fix admin recover for partition table.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(i+8), int64(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
tk.MustExec("admin check table admin_test_p")
}
}

func (s *testSuite) TestAdminCheckTable(c *C) {
// test NULL value.
tk := testkit.NewTestKit(c, s.store)
Expand Down
15 changes: 10 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
e := &CheckTableExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dbName: v.DBName,
tblInfo: v.TblInfo,
indices: v.Indices,
table: v.Table,
indexInfos: v.IndexInfos,
is: b.is,
srcs: readerExecs,
exitCh: make(chan struct{}),
retCh: make(chan error, len(v.Indices)),
retCh: make(chan error, len(readerExecs)),
}
return e
}
Expand Down Expand Up @@ -1769,12 +1769,17 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, errors.Trace(err)
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
table, _ := b.is.TableByID(ts.Table.ID)
tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
}
e := &TableReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
physicalTableID: ts.Table.ID,
table: table,
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
columns: ts.Columns,
Expand Down
64 changes: 40 additions & 24 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -431,14 +432,14 @@ func getTableName(is infoschema.InfoSchema, id int64) string {
type CheckTableExec struct {
baseExecutor

dbName string
tblInfo *model.TableInfo
indices []table.Index
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
dbName string
table table.Table
indexInfos []*model.IndexInfo
srcs []*IndexLookUpExecutor
done bool
is infoschema.InfoSchema
exitCh chan struct{}
retCh chan error
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -466,7 +467,20 @@ func (e *CheckTableExec) Close() error {
return firstErr
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, num int, src *IndexLookUpExecutor) error {
func (e *CheckTableExec) checkTableIndexHandle(ctx context.Context, idxInfo *model.IndexInfo) error {
// For partition table, there will be multi same index indexLookUpReaders on different partitions.
for _, src := range e.srcs {
if src.index.Name.L == idxInfo.Name.L {
err := e.checkIndexHandle(ctx, src)
if err != nil {
return err
}
}
}
return nil
}

func (e *CheckTableExec) checkIndexHandle(ctx context.Context, src *IndexLookUpExecutor) error {
cols := src.schema.Columns
retFieldTypes := make([]*types.FieldType, len(cols))
for i := range cols {
Expand Down Expand Up @@ -507,20 +521,19 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
defer func() { e.done = true }()

idxNames := make([]string, 0, len(e.indices))
for _, idx := range e.indices {
idxNames = append(idxNames, idx.Meta().Name.O)
idxNames := make([]string, 0, len(e.indexInfos))
for _, idx := range e.indexInfos {
idxNames = append(idxNames, idx.Name.O)
}
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.tblInfo.Name.O, idxNames)
greater, idxOffset, err := admin.CheckIndicesCount(e.ctx, e.dbName, e.table.Meta().Name.O, idxNames)
if err != nil {
tbl := e.srcs[idxOffset].table
if greater == admin.IdxCntGreater {
err = e.checkIndexHandle(ctx, idxOffset, e.srcs[idxOffset])
err = e.checkTableIndexHandle(ctx, e.indexInfos[idxOffset])
} else if greater == admin.TblCntGreater {
err = e.checkTableRecord(tbl, idxOffset)
err = e.checkTableRecord(idxOffset)
}
if err != nil && admin.ErrDataInConsistent.Equal(err) {
return ErrAdminCheckTable.GenWithStack("%v err:%v", tbl.Meta().Name, err)
return ErrAdminCheckTable.GenWithStack("%v err:%v", e.table.Meta().Name, err)
}
return errors.Trace(err)
}
Expand All @@ -534,7 +547,7 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
go func(num int) {
defer wg.Done()
util.WithRecovery(func() {
err1 := e.checkIndexHandle(ctx, num, e.srcs[num])
err1 := e.checkIndexHandle(ctx, e.srcs[num])
if err1 != nil {
logutil.Logger(ctx).Info("check index handle failed", zap.Error(err))
}
Expand All @@ -555,21 +568,24 @@ func (e *CheckTableExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *CheckTableExec) checkTableRecord(tbl table.Table, idxOffset int) error {
idx := e.indices[idxOffset]
func (e *CheckTableExec) checkTableRecord(idxOffset int) error {
idxInfo := e.indexInfos[idxOffset]
// TODO: Fix me later, can not use genExprs in indexLookUpReader, because the schema of expression is different.
genExprs := e.srcs[idxOffset].genExprs
txn, err := e.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
if tbl.Meta().GetPartitionInfo() == nil {
return admin.CheckRecordAndIndex(e.ctx, txn, tbl, idx, genExprs)
if e.table.Meta().GetPartitionInfo() == nil {
idx := tables.NewIndex(e.table.Meta().ID, e.table.Meta(), idxInfo)
return admin.CheckRecordAndIndex(e.ctx, txn, e.table, idx, genExprs)
}

info := tbl.Meta().GetPartitionInfo()
info := e.table.Meta().GetPartitionInfo()
for _, def := range info.Definitions {
pid := def.ID
partition := tbl.(table.PartitionedTable).GetPartition(pid)
partition := e.table.(table.PartitionedTable).GetPartition(pid)
idx := tables.NewIndex(def.ID, e.table.Meta(), idxInfo)
if err := admin.CheckRecordAndIndex(e.ctx, txn, partition, idx, genExprs); err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ type CheckTable struct {
baseSchemaProducer

DBName string
TblInfo *model.TableInfo
Indices []table.Index
Table table.Table
IndexInfos []*model.IndexInfo
IndexLookUpReaders []*PhysicalIndexLookUpReader
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
cop.tablePlan = ts
}

is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
indexConds, tblConds := splitIndexFilterConditions(remainedConds, idx.Columns, ds.tableInfo)
path := &accessPath{
indexFilters: indexConds,
Expand Down
4 changes: 2 additions & 2 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
// If it's parent requires double read task, return max cost.
return invalidTask, nil
}
is.initSchema(ds.id, idx, cop.tablePlan != nil)
is.initSchema(idx, cop.tablePlan != nil)
// Only use expectedCnt when it's smaller than the count we calculated.
// e.g. IndexScan(count1)->After Filter(count2). The `ds.stats.RowCount` is count2. count1 is the one we need to calculate
// If expectedCnt and count2 are both zero and we go into the below `if` block, the count1 will be set to zero though it's shouldn't be.
Expand Down Expand Up @@ -542,7 +542,7 @@ func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty, candid
}

// TODO: refactor this part, we should not call Clone in fact.
func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, isDoubleRead bool) {
func (is *PhysicalIndexScan) initSchema( idx *model.IndexInfo, isDoubleRead bool) {
indexCols := make([]*expression.Column, 0, len(idx.Columns))
for _, col := range idx.Columns {
colFound := is.dataSourceSchema.FindColumnByName(col.Name.L)
Expand Down
Loading