Skip to content

Commit

Permalink
Merge branch 'release-6.1' into release-6.1-ed5e63a7a0a2
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxin9014 authored Oct 9, 2022
2 parents aafe5a2 + 38ef5d6 commit a7ba314
Show file tree
Hide file tree
Showing 186 changed files with 4,674 additions and 1,521 deletions.
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ header:
- 'dumpling/'
- 'tidb-binlog/driver/example'
- 'tidb-binlog/proto/go-binlog/secondary_binlog.pb.go'
- '**/*.sql'
comment: on-failure
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ dumpling_tidy:
dumpling_bins:
@which bin/tidb-server
@which bin/minio
@which bin/mc
@which bin/tidb-lightning
@which bin/sync_diff_inspector

Expand Down
23 changes: 23 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,29 @@ func TestExplain(t *testing.T) {
tk.MustExec("drop global binding for SELECT * from t1 union SELECT * from t1")
}

func TestBindSemiJoinRewrite(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
require.True(t, tk.HasKeywordInOperatorInfo("select * from t1 where exists(select 1 from t2 where t1.id=t2.id)", "semi join"))
require.True(t, tk.NotHasKeywordInOperatorInfo("select * from t1 where exists(select /*+ SEMI_JOIN_REWRITE() */ 1 from t2 where t1.id=t2.id)", "semi join"))

tk.MustExec(`
create global binding for
select * from t1 where exists(select 1 from t2 where t1.id=t2.id)
using
select * from t1 where exists(select /*+ SEMI_JOIN_REWRITE() */ 1 from t2 where t1.id=t2.id)
`)

require.True(t, tk.NotHasKeywordInOperatorInfo("select * from t1 where exists(select 1 from t2 where t1.id=t2.id)", "semi join"))
}

// TestBindingSymbolList tests sql with "?, ?, ?, ?", fixes #13871
func TestBindingSymbolList(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type Glue interface {

// GetVersion gets BR package version to run backup/restore job
GetVersion() string

// UseOneShotSession temporary creates session from store when run backup job.
// because we don't have to own domain/session during the whole backup.
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error
}

// Session is an abstraction of the session.Session interface.
Expand Down
39 changes: 39 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,45 @@ func (g Glue) GetVersion() string {
return g.tikvGlue.GetVersion()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
se, err := session.CreateSession(store)
if err != nil {
return errors.Trace(err)
}
glueSession := &tidbSession{
se: se,
}
defer func() {
se.Close()
log.Info("one shot session closed")
}()
// dom will be created during session.CreateSession.
dom, err := session.GetDomain(store)
if err != nil {
return errors.Trace(err)
}
// because domain was created during the whole program exists.
// and it will register br info to info syncer.
// we'd better close it as soon as possible.
if closeDomain {
defer func() {
dom.Close()
log.Info("one shot domain closed")
}()
}
err = fn(glueSession)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
}

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
return gs.ExecuteInternal(ctx, sql)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,8 @@ func (Glue) Record(name string, val uint64) {
func (Glue) GetVersion() string {
return "BR\n" + build.Info()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}
47 changes: 41 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -57,6 +58,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mathutil"
tikverror "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -1103,7 +1105,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1133,6 +1135,19 @@ const (
retryIngest
)

func (local *local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
// it might return io.EOF(it's GRPC Unavailable in most case),
// we need to retry on this error.
// see SendMsg in https://pkg.go.dev/google.golang.org/grpc#ClientStream
if err == io.EOF {
return true
}
return common.IsRetryableError(err)
}

func (local *local) writeAndIngestPairs(
ctx context.Context,
engine *Engine,
Expand All @@ -1150,7 +1165,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}

Expand Down Expand Up @@ -1291,7 +1306,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
if err == nil || common.IsContextCanceledError(err) {
return
}
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
break
}
log.L().Warn("write and ingest by range failed",
Expand Down Expand Up @@ -1468,6 +1483,10 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return err
}
Expand Down Expand Up @@ -1798,18 +1817,34 @@ func (local *local) isIngestRetryable(
}
return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, errors.New(errPb.GetMessage())
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
return retryNone, nil, common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage())
case errPb.ReadIndexNotReady != nil:
// this error happens when this region is splitting, the error might be:
// read index not ready, reason can not read index due to split, region 64037
// we have paused schedule, but it's temporary,
// if next request takes a long time, there's chance schedule is enabled again
// or on key range border, another engine sharing this region tries to split this
// region may cause this error too.
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
case errPb.DiskFull != nil:
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
// here we use a single named-error ErrKVIngestFailed to represent them all
// we can separate them later if it's needed
return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
Expand Down
37 changes: 35 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/binary"
"io"
"math"
"math/rand"
"os"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -505,10 +507,35 @@ func TestIsIngestRetryable(t *testing.T) {
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{Message: "unknown error"}
resp.Error = &errorpb.Error{
ReadIndexNotReady: &errorpb.ReadIndexNotReady{
Reason: "test",
},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{
Message: "raft: proposal dropped",
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped))

resp.Error = &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.Contains(t, err.Error(), "non-retryable error")

resp.Error = &errorpb.Error{
StaleCommand: &errorpb.StaleCommand{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.EqualError(t, err, "non-retryable error: unknown error")
require.True(t, berrors.Is(err, common.ErrKVIngestFailed))
}

type testIngester struct{}
Expand Down Expand Up @@ -1235,3 +1262,9 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
}

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableImportTiKVError(io.EOF))
require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF)))
}
46 changes: 25 additions & 21 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,31 @@ var (
ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
)

type withStack struct {
Expand Down
Loading

0 comments on commit a7ba314

Please sign in to comment.