Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-f8a00f33e8e6
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jun 23, 2022
2 parents b8374eb + 10dd152 commit c6d3264
Show file tree
Hide file tree
Showing 22 changed files with 400 additions and 63 deletions.
10 changes: 10 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -598,6 +599,15 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F
return files[:idx], files[idx:]
}

// SplitRanges implements TiKVRestorer.
func (rc *Client) SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error {
return SplitRanges(ctx, rc, ranges, rewriteRules, updateCh, isRawKv)
}

// RestoreFiles tries to restore the files.
func (rc *Client) RestoreFiles(
ctx context.Context,
Expand Down
40 changes: 31 additions & 9 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/summary"
Expand Down Expand Up @@ -183,8 +185,27 @@ type BatchSender interface {
Close()
}

// TiKVRestorer is the minimal methods required for restoring.
// It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant.
// Maybe TODO: make a better abstraction?
type TiKVRestorer interface {
// SplitRanges split regions implicated by the ranges and rewrite rules.
// After spliting, it also scatters the fresh regions.
SplitRanges(ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
updateCh glue.Progress,
isRawKv bool) error
// RestoreFiles import the files to the TiKV.
RestoreFiles(ctx context.Context,
files []*backuppb.File,
rewriteRules *RewriteRules,
updateCh glue.Progress) error
}

type tikvSender struct {
client *Client
client TiKVRestorer

updateCh glue.Progress

sink TableSink
Expand All @@ -209,7 +230,7 @@ func (b *tikvSender) RestoreBatch(ranges DrainResult) {
// NewTiKVSender make a sender that send restore requests to TiKV.
func NewTiKVSender(
ctx context.Context,
cli *Client,
cli TiKVRestorer,
updateCh glue.Progress,
splitConcurrency uint,
) (BatchSender, error) {
Expand Down Expand Up @@ -252,9 +273,9 @@ func (b *tikvSender) splitWorker(ctx context.Context,
b.wg.Done()
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
close(next)
log.Info("TiKV Sender: split worker exits.")
}()

start := time.Now()
Expand All @@ -266,7 +287,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
pool := utils.NewWorkerPool(concurrency, "split")
for {
select {
case <-ctx.Done():
case <-ectx.Done():
return
case result, ok := <-ranges:
if !ok {
Expand All @@ -289,7 +310,7 @@ func (b *tikvSender) splitWorker(ctx context.Context,
// hence the checksum would fail.
done := b.registerTableIsRestoring(result.TablesToSend)
pool.ApplyOnErrorGroup(eg, func() error {
err := SplitRanges(ectx, b.client, result.Ranges, result.RewriteRules, b.updateCh, false)
err := b.client.SplitRanges(ectx, result.Ranges, result.RewriteRules, b.updateCh, false)
if err != nil {
log.Error("failed on split range", rtree.ZapRanges(result.Ranges), zap.Error(err))
return err
Expand Down Expand Up @@ -338,17 +359,17 @@ func (b *tikvSender) waitTablesDone(ts []CreatedTable) {
func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResultAndDone) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
log.Debug("restore worker closed")
log.Info("TiKV Sender: restore worker prepare to close.")
if err := eg.Wait(); err != nil {
b.sink.EmitError(err)
return
}
b.wg.Done()
b.sink.Close()
b.wg.Done()
log.Info("TiKV Sender: restore worker exits.")
}()
for {
select {
case <-ctx.Done():
case <-ectx.Done():
return
case r, ok := <-ranges:
if !ok {
Expand All @@ -360,6 +381,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul
eg.Go(func() error {
e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh)
if e != nil {
log.Error("restore batch meet error", logutil.ShortError(e), logutil.Files(files))
r.done()
return e
}
Expand Down
125 changes: 125 additions & 0 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -572,3 +577,123 @@ func (s *testRangeSuite) TestRegionConsistency(c *C) {
ca.err)
}
}

type fakeRestorer struct {
errorInSplit bool
splitRanges []rtree.Range
restoredFiles []*backuppb.File
}

func (f *fakeRestorer) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *restore.RewriteRules, updateCh glue.Progress, isRawKv bool) error {
if ctx.Err() != nil {
return ctx.Err()
}
f.splitRanges = append(f.splitRanges, ranges...)
if f.errorInSplit {
err := errors.Annotatef(berrors.ErrRestoreSplitFailed,
"the key space takes many efforts and finally get together, how dare you split them again... :<")
log.Error("error happens :3", logutil.ShortError(err))
return err
}
return nil
}

func (f *fakeRestorer) RestoreFiles(ctx context.Context, files []*backuppb.File, rewriteRules *restore.RewriteRules, updateCh glue.Progress) error {
if ctx.Err() != nil {
return ctx.Err()
}
f.restoredFiles = append(f.restoredFiles, files...)
err := errors.Annotatef(berrors.ErrRestoreWriteAndIngest, "the files to restore are taken by a hijacker, meow :3")
log.Error("error happens :3", logutil.ShortError(err))
return err
}

func fakeRanges(keys ...string) (r restore.DrainResult) {
for i := range keys {
if i+1 == len(keys) {
return
}
r.Ranges = append(r.Ranges, rtree.Range{
StartKey: []byte(keys[i]),
EndKey: []byte(keys[i+1]),
Files: []*backuppb.File{{Name: "fake.sst"}},
})
}
return
}

type errorInTimeSink struct {
ctx context.Context
errCh chan error
t *testing.T
}

func (e errorInTimeSink) EmitTables(tables ...restore.CreatedTable) {}

func (e errorInTimeSink) EmitError(err error) {
e.errCh <- err
}

func (e errorInTimeSink) Close() {}

func (e errorInTimeSink) Wait() {
select {
case <-e.ctx.Done():
e.t.Logf("The context is canceled but no error happen")
e.t.FailNow()
case <-e.errCh:
}
}

func assertErrorEmitInTime(ctx context.Context, t *testing.T) errorInTimeSink {
errCh := make(chan error, 1)
return errorInTimeSink{
ctx: ctx,
errCh: errCh,
t: t,
}
}

func TestRestoreFailed(t *testing.T) {
ranges := []restore.DrainResult{
fakeRanges("aax", "abx", "abz"),
fakeRanges("abz", "bbz", "bcy"),
fakeRanges("bcy", "cad", "xxy"),
}
r := &fakeRestorer{}
sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1)
require.NoError(t, err)
dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sink := assertErrorEmitInTime(dctx, t)
sender.PutSink(sink)
for _, r := range ranges {
sender.RestoreBatch(r)
}
sink.Wait()
sink.Close()
sender.Close()
require.GreaterOrEqual(t, len(r.restoredFiles), 1)
}

func TestSplitFailed(t *testing.T) {
ranges := []restore.DrainResult{
fakeRanges("aax", "abx", "abz"),
fakeRanges("abz", "bbz", "bcy"),
fakeRanges("bcy", "cad", "xxy"),
}
r := &fakeRestorer{errorInSplit: true}
sender, err := restore.NewTiKVSender(context.TODO(), r, nil, 1)
require.NoError(t, err)
dctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
sink := assertErrorEmitInTime(dctx, t)
sender.PutSink(sink)
for _, r := range ranges {
sender.RestoreBatch(r)
}
sink.Wait()
sender.Close()
require.GreaterOrEqual(t, len(r.splitRanges), 2)
require.Len(t, r.restoredFiles, 0)
}
4 changes: 2 additions & 2 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ func TestSavedAnalyzeOptions(t *testing.T) {
require.Equal(t, 2, len(col0.Buckets))

// manual analyze uses the table-level persisted options by merging the new options
tk.MustExec("analyze table t columns a,b with 0.9 samplerate, 3 buckets")
tk.MustExec("analyze table t columns a,b with 1 samplerate, 3 buckets")
tbl = h.GetTableStats(tableInfo)
require.Greater(t, tbl.Version, lastVersion)
lastVersion = tbl.Version
Expand All @@ -1347,7 +1347,7 @@ func TestSavedAnalyzeOptions(t *testing.T) {
// The columns are: table_id, sample_num, sample_rate, buckets, topn, column_choice, column_ids.
rs = tk.MustQuery("select * from mysql.analyze_options where table_id=" + strconv.FormatInt(tbl.PhysicalID, 10))
require.Equal(t, 1, len(rs.Rows()))
require.Equal(t, "0.9", rs.Rows()[0][2])
require.Equal(t, "1", rs.Rows()[0][2])
require.Equal(t, "3", rs.Rows()[0][3])
require.Equal(t, "1", rs.Rows()[0][4])
require.Equal(t, "LIST", rs.Rows()[0][5])
Expand Down
20 changes: 20 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,23 @@ func TestCTEExecError(t *testing.T) {
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
}
}

// https://github.com/pingcap/tidb/issues/33965.
func TestCTEsInView(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")

tk.MustExec("create database if not exists test1;")
tk.MustExec("create table test.t (a int);")
tk.MustExec("create table test1.t (a int);")
tk.MustExec("insert into test.t values (1);")
tk.MustExec("insert into test1.t values (2);")

tk.MustExec("use test;")
tk.MustExec("create definer='root'@'localhost' view test.v as with tt as (select * from t) select * from tt;")
tk.MustQuery("select * from test.v;").Check(testkit.Rows("1"))
tk.MustExec("use test1;")
tk.MustQuery("select * from test.v;").Check(testkit.Rows("1"))
}
3 changes: 3 additions & 0 deletions executor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -49,6 +50,8 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})
tikv.EnableFailpoints()
failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)
defer failpoint.Disable("tikvclient/injectLiveness")
tmpDir := config.GetGlobalConfig().TempStoragePath
_ = os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run.
_ = os.MkdirAll(tmpDir, 0755)
Expand Down
19 changes: 19 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,25 @@ func (s *tiflashTestSuite) TestReadPartitionTable(c *C) {
tk.MustExec("commit")
}

func (s *tiflashTestSuite) TestAggPushDownApplyAll(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists foo")
tk.MustExec("drop table if exists bar")
tk.MustExec("create table foo(a int, b int)")
tk.MustExec("create table bar(a double not null, b decimal(65,0) not null)")
tk.MustExec("alter table foo set tiflash replica 1")
tk.MustExec("alter table bar set tiflash replica 1")
tk.MustExec("insert into foo values(0, NULL)")
tk.MustExec("insert into bar values(0, 0)")

tk.MustExec("set @@session.tidb_allow_mpp=1")
tk.MustExec("set @@session.tidb_enforce_mpp=1")

tk.MustQuery("select * from foo where a=all(select a from bar where bar.b=foo.b)").Check(testkit.Rows("0 <nil>"))
}

func (s *tiflashTestSuite) TestReadUnsigedPK(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion expression/aggregation/base_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (a *baseFuncDesc) typeInfer4MaxMin(ctx sessionctx.Context) {
a.Args[0] = expression.BuildCastFunction(ctx, a.Args[0], tp)
}
a.RetTp = a.Args[0].GetType()
if (a.Name == ast.AggFuncMax || a.Name == ast.AggFuncMin) && a.RetTp.Tp != mysql.TypeBit {
if a.Name == ast.AggFuncMax || a.Name == ast.AggFuncMin {
a.RetTp = a.Args[0].GetType().Clone()
a.RetTp.Flag &^= mysql.NotNullFlag
}
Expand Down
29 changes: 29 additions & 0 deletions expression/aggregation/base_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,32 @@ func TestClone(t *testing.T) {
require.Equal(t, col, desc.Args[0])
require.False(t, desc.equal(ctx, cloned))
}

func TestBaseFunc_InferAggRetType(t *testing.T) {
ctx := mock.NewContext()
doubleType := types.NewFieldType(mysql.TypeDouble)
bitType := types.NewFieldType(mysql.TypeBit)

funcNames := []string{
ast.AggFuncMax, ast.AggFuncMin,
}
dataTypes := []*types.FieldType{
doubleType, bitType,
}

for _, dataType := range dataTypes {
notNullType := dataType.Clone()
notNullType.Flag |= mysql.NotNullFlag
col := &expression.Column{
UniqueID: 0,
RetType: notNullType,
}
for _, name := range funcNames {
desc, err := newBaseFuncDesc(ctx, name, []expression.Expression{col})
require.NoError(t, err)
err = desc.TypeInfer(ctx)
require.NoError(t, err)
require.Equal(t, dataType, desc.RetTp)
}
}
}
Loading

0 comments on commit c6d3264

Please sign in to comment.