Skip to content

Commit

Permalink
Merge branch 'master' into issue-31372
Browse files Browse the repository at this point in the history
  • Loading branch information
unconsolable authored Jan 26, 2022
2 parents 4d967d2 + 345a94e commit 4be1bd6
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 441 deletions.
87 changes: 2 additions & 85 deletions cmd/ddltest/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@ package ddltest

import (
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
goctx "golang.org/x/net/context"
)
Expand All @@ -44,90 +39,12 @@ func getIndex(t table.Table, name string) table.Index {
return nil
}

func (s *ddlSuite) checkAddIndex(t *testing.T, indexInfo *model.IndexInfo) {
ctx := s.ctx
err := ctx.NewTxn(goctx.Background())
require.NoError(t, err)
tbl := s.getTable(t, "test_index")

// read handles form table
handles := kv.NewHandleMap()
err = tables.IterRecords(tbl, ctx, tbl.Cols(),
func(h kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) {
handles.Set(h, struct{}{})
return true, nil
})
require.NoError(t, err)

// read handles from index
idx := tables.NewIndex(tbl.Meta().ID, tbl.Meta(), indexInfo)
err = ctx.NewTxn(goctx.Background())
require.NoError(t, err)
txn, err := ctx.Txn(false)
require.NoError(t, err)
defer func() {
err = txn.Rollback()
require.NoError(t, err)
}()

it, err := idx.SeekFirst(txn)
require.NoError(t, err)
defer it.Close()

for {
_, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
break
}

require.NoError(t, err)
_, ok := handles.Get(h)
require.True(t, ok)
handles.Delete(h)
}

require.Equal(t, 0, handles.Len())
}

func (s *ddlSuite) checkDropIndex(t *testing.T, indexInfo *model.IndexInfo) {
gcWorker, err := gcworker.NewMockGCWorker(s.store)
require.NoError(t, err)
err = gcWorker.DeleteRanges(goctx.Background(), uint64(math.MaxInt32))
require.NoError(t, err)

ctx := s.ctx
err = ctx.NewTxn(goctx.Background())
require.NoError(t, err)
tbl := s.getTable(t, "test_index")

// read handles from index
idx := tables.NewIndex(tbl.Meta().ID, tbl.Meta(), indexInfo)
err = ctx.NewTxn(goctx.Background())
require.NoError(t, err)
txn, err := ctx.Txn(false)
require.NoError(t, err)
defer func() {
err := txn.Rollback()
require.NoError(t, err)
}()

it, err := idx.SeekFirst(txn)
require.NoError(t, err)
defer it.Close()

handles := kv.NewHandleMap()
for {
_, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
break
}

require.NoError(t, err)
handles.Set(h, struct{}{})
}

// TODO: Uncomment this after apply pool is finished
// c.Assert(handles.Len(), Equals, 0)
s.mustExec(fmt.Sprintf("admin check table %s", indexInfo.Table.String()))
}

// TestIndex operations on table test_index (c int, c1 bigint, c2 double, c3 varchar(256), primary key(c)).
Expand Down Expand Up @@ -194,7 +111,7 @@ func TestIndex(t *testing.T) {
if col.Add {
require.NotNil(t, index)
oldIndex = index
s.checkAddIndex(t, index.Meta())
s.mustExec("admin check table test_index")
} else {
require.Nil(t, index)
s.checkDropIndex(t, oldIndex.Meta())
Expand Down
18 changes: 1 addition & 17 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -1452,22 +1451,7 @@ LOOP:

c.Assert(ctx.NewTxn(context.Background()), IsNil)

it, err := nidx.SeekFirst(txn)
c.Assert(err, IsNil)
defer it.Close()

for {
_, h, err := it.Next()
if terror.ErrorEqual(err, io.EOF) {
break
}

c.Assert(err, IsNil)
_, ok := handles.Get(h)
c.Assert(ok, IsTrue, Commentf("handle: %v", h.String()))
handles.Delete(h)
}
c.Assert(handles.Len(), Equals, 0)
tk.MustExec("admin check table test_add_index")
tk.MustExec("drop table test_add_index")
}

Expand Down
5 changes: 3 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -185,7 +186,7 @@ type ddl struct {
m sync.RWMutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup // It's only used to deal with data race in restart_test.
wg tidbutil.WaitGroupWrapper // It's only used to deal with data race in restart_test.
limitJobCh chan *limitJobTask

*ddlCtx
Expand Down Expand Up @@ -406,7 +407,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLInstance).Inc()

// Start some background routine to manage TiFlash replica.
go d.PollTiFlashRoutine()
d.wg.Run(d.PollTiFlashRoutine)

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ func HandlePlacementRuleRoutine(ctx sessionctx.Context, d *ddl, tableList []TiFl
func (d *ddl) PollTiFlashRoutine() {
pollTiflashContext := NewTiFlashManagementContext()
for {
select {
case <-d.ctx.Done():
return
case <-time.After(PollTiFlashInterval):
}
if d.IsTiFlashPollEnabled() {
if d.sessPool == nil {
logutil.BgLogger().Error("failed to get sessionPool for pollTiFlashReplicaStatus")
Expand All @@ -437,10 +442,5 @@ func (d *ddl) PollTiFlashRoutine() {
logutil.BgLogger().Error("failed to get session for pollTiFlashReplicaStatus", zap.Error(err))
}
}
select {
case <-d.ctx.Done():
return
case <-time.After(PollTiFlashInterval):
}
}
}
6 changes: 0 additions & 6 deletions table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,10 @@ type Index interface {
Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// Drop supports drop table, drop index statements.
Drop(txn kv.Transaction) error
// Exist supports check index exists or not.
Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
// GenIndexKey generates an index key.
GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error)
// Seek supports where clause.
Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues []types.Datum) (iter IndexIterator, hit bool, err error)
// SeekFirst supports aggregate min and ascend order by.
SeekFirst(r kv.Retriever) (iter IndexIterator, err error)
// FetchValues fetched index column values in a row.
// Param columns is a reused buffer, if it is not nil, FetchValues will fill the index values in it,
// and return the buffer, if it is nil, FetchValues will allocate the buffer instead.
Expand Down
110 changes: 0 additions & 110 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package tables

import (
"context"
"io"
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -33,54 +30,6 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
)

// indexIter is for KV store index iterator.
type indexIter struct {
it kv.Iterator
idx *index
prefix kv.Key
colInfos []rowcodec.ColInfo
tps []*types.FieldType
}

// Close does the clean up works when KV store index iterator is closed.
func (c *indexIter) Close() {
if c.it != nil {
c.it.Close()
c.it = nil
}
}

// Next returns current key and moves iterator to the next step.
func (c *indexIter) Next() (indexData []types.Datum, h kv.Handle, err error) {
if !c.it.Valid() {
return nil, nil, errors.Trace(io.EOF)
}
if !c.it.Key().HasPrefix(c.prefix) {
return nil, nil, errors.Trace(io.EOF)
}
vals, err := tablecodec.DecodeIndexKV(c.it.Key(), c.it.Value(), len(c.colInfos), tablecodec.HandleNotNeeded, c.colInfos)
if err != nil {
return nil, nil, errors.Trace(err)
}
handle, err := tablecodec.DecodeIndexHandle(c.it.Key(), c.it.Value(), len(c.colInfos))
if err != nil {
return nil, nil, errors.Trace(err)
}
for i, v := range vals {
d, err := tablecodec.DecodeColumnValue(v, c.tps[i], time.Local)
if err != nil {
return nil, nil, errors.Trace(err)
}
indexData = append(indexData, d)
}
// update new iter to next
err = c.it.Next()
if err != nil {
return nil, nil, err
}
return indexData, handle, nil
}

// index is the data structure for index data in the KV store.
type index struct {
idxInfo *model.IndexInfo
Expand Down Expand Up @@ -255,65 +204,6 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
return err
}

// Drop removes the KV index from store.
func (c *index) Drop(txn kv.Transaction) error {
it, err := txn.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return err
}
defer it.Close()

// remove all indices
for it.Valid() {
if !it.Key().HasPrefix(c.prefix) {
break
}
err := txn.GetMemBuffer().Delete(it.Key())
if err != nil {
return err
}
err = it.Next()
if err != nil {
return err
}
}
return nil
}

// Seek searches KV index for the entry with indexedValues.
func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues []types.Datum) (iter table.IndexIterator, hit bool, err error) {
key, _, err := c.GenIndexKey(sc, indexedValues, nil, nil)
if err != nil {
return nil, false, err
}

upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, err
}
// check if hit
hit = false
if it.Valid() && it.Key().Cmp(key) == 0 {
hit = true
}
colInfos := BuildRowcodecColInfoForIndexColumns(c.idxInfo, c.tblInfo)
tps := BuildFieldTypesForIndexColumns(c.idxInfo, c.tblInfo)
return &indexIter{it: it, idx: c, prefix: c.prefix, colInfos: colInfos, tps: tps}, hit, nil
}

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, err
}
colInfos := BuildRowcodecColInfoForIndexColumns(c.idxInfo, c.tblInfo)
tps := BuildFieldTypesForIndexColumns(c.idxInfo, c.tblInfo)
return &indexIter{it: it, idx: c, prefix: c.prefix, colInfos: colInfos, tps: tps}, nil
}

func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) {
key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil)
if err != nil {
Expand Down
Loading

0 comments on commit 4be1bd6

Please sign in to comment.