Skip to content

Commit

Permalink
Merge branch 'release-6.1' into release-6.1-87c5b5068ab8
Browse files Browse the repository at this point in the history
  • Loading branch information
heipark authored Sep 5, 2022
2 parents 2e18f53 + 696f08e commit 0744ca9
Show file tree
Hide file tree
Showing 132 changed files with 3,689 additions and 998 deletions.
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ header:
- 'dumpling/'
- 'tidb-binlog/driver/example'
- 'tidb-binlog/proto/go-binlog/secondary_binlog.pb.go'
- '**/*.sql'
comment: on-failure
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ dumpling_tidy:
dumpling_bins:
@which bin/tidb-server
@which bin/minio
@which bin/mc
@which bin/tidb-lightning
@which bin/sync_diff_inspector

Expand Down
23 changes: 23 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,29 @@ func TestExplain(t *testing.T) {
tk.MustExec("drop global binding for SELECT * from t1 union SELECT * from t1")
}

func TestBindSemiJoinRewrite(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
require.True(t, tk.HasKeywordInOperatorInfo("select * from t1 where exists(select 1 from t2 where t1.id=t2.id)", "semi join"))
require.True(t, tk.NotHasKeywordInOperatorInfo("select * from t1 where exists(select /*+ SEMI_JOIN_REWRITE() */ 1 from t2 where t1.id=t2.id)", "semi join"))

tk.MustExec(`
create global binding for
select * from t1 where exists(select 1 from t2 where t1.id=t2.id)
using
select * from t1 where exists(select /*+ SEMI_JOIN_REWRITE() */ 1 from t2 where t1.id=t2.id)
`)

require.True(t, tk.NotHasKeywordInOperatorInfo("select * from t1 where exists(select 1 from t2 where t1.id=t2.id)", "semi join"))
}

// TestBindingSymbolList tests sql with "?, ?, ?, ?", fixes #13871
func TestBindingSymbolList(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
Expand Down
47 changes: 41 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -57,6 +58,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/mathutil"
tikverror "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -1103,7 +1105,7 @@ WriteAndIngest:
err = local.writeAndIngestPairs(ctx, engine, region, pairStart, end, regionSplitSize, regionSplitKeys)
local.ingestConcurrency.Recycle(w)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}
_, regionStart, _ := codec.DecodeBytes(region.Region.StartKey, []byte{})
Expand Down Expand Up @@ -1133,6 +1135,19 @@ const (
retryIngest
)

func (local *local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
// it might return io.EOF(it's GRPC Unavailable in most case),
// we need to retry on this error.
// see SendMsg in https://pkg.go.dev/google.golang.org/grpc#ClientStream
if err == io.EOF {
return true
}
return common.IsRetryableError(err)
}

func (local *local) writeAndIngestPairs(
ctx context.Context,
engine *Engine,
Expand All @@ -1150,7 +1165,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
return err
}

Expand Down Expand Up @@ -1291,7 +1306,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
if err == nil || common.IsContextCanceledError(err) {
return
}
if !common.IsRetryableError(err) {
if !local.isRetryableImportTiKVError(err) {
break
}
log.L().Warn("write and ingest by range failed",
Expand Down Expand Up @@ -1468,6 +1483,10 @@ func (local *local) ResolveDuplicateRows(ctx context.Context, tbl table.Table, t
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(err).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return err
}
Expand Down Expand Up @@ -1798,18 +1817,34 @@ func (local *local) isIngestRetryable(
}
return retryTy, newRegion, common.ErrKVEpochNotMatch.GenWithStack(errPb.GetMessage())
case strings.Contains(errPb.Message, "raft: proposal dropped"):
// TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader'
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, errors.New(errPb.GetMessage())
return retryWrite, newRegion, common.ErrKVRaftProposalDropped.GenWithStack(errPb.GetMessage())
case errPb.ServerIsBusy != nil:
return retryNone, nil, common.ErrKVServerIsBusy.GenWithStack(errPb.GetMessage())
case errPb.RegionNotFound != nil:
return retryNone, nil, common.ErrKVRegionNotFound.GenWithStack(errPb.GetMessage())
case errPb.ReadIndexNotReady != nil:
// this error happens when this region is splitting, the error might be:
// read index not ready, reason can not read index due to split, region 64037
// we have paused schedule, but it's temporary,
// if next request takes a long time, there's chance schedule is enabled again
// or on key range border, another engine sharing this region tries to split this
// region may cause this error too.
newRegion, err = getRegion()
if err != nil {
return retryNone, nil, errors.Trace(err)
}
return retryWrite, newRegion, common.ErrKVReadIndexNotReady.GenWithStack(errPb.GetMessage())
case errPb.DiskFull != nil:
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
}
return retryNone, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage())
// all others ingest error, such as stale command, etc. we'll retry it again from writeAndIngestByRange
// here we use a single named-error ErrKVIngestFailed to represent them all
// we can separate them later if it's needed
return retryNone, nil, common.ErrKVIngestFailed.GenWithStack(resp.GetError().GetMessage())
}

// return the smallest []byte that is bigger than current bytes.
Expand Down
37 changes: 35 additions & 2 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/binary"
"io"
"math"
"math/rand"
"os"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -505,10 +507,35 @@ func TestIsIngestRetryable(t *testing.T) {
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{Message: "unknown error"}
resp.Error = &errorpb.Error{
ReadIndexNotReady: &errorpb.ReadIndexNotReady{
Reason: "test",
},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.Error(t, err)

resp.Error = &errorpb.Error{
Message: "raft: proposal dropped",
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryWrite, retryType)
require.True(t, berrors.Is(err, common.ErrKVRaftProposalDropped))

resp.Error = &errorpb.Error{
DiskFull: &errorpb.DiskFull{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.Contains(t, err.Error(), "non-retryable error")

resp.Error = &errorpb.Error{
StaleCommand: &errorpb.StaleCommand{},
}
retryType, _, err = local.isIngestRetryable(ctx, resp, region, metas)
require.Equal(t, retryNone, retryType)
require.EqualError(t, err, "non-retryable error: unknown error")
require.True(t, berrors.Is(err, common.ErrKVIngestFailed))
}

type testIngester struct{}
Expand Down Expand Up @@ -1235,3 +1262,9 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
}

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableImportTiKVError(io.EOF))
require.True(t, l.isRetryableImportTiKVError(errors.Trace(io.EOF)))
}
46 changes: 25 additions & 21 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,31 @@ var (
ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))

ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
)

type withStack struct {
Expand Down
31 changes: 27 additions & 4 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"strings"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,28 @@ import (
"google.golang.org/grpc/status"
)

// some component doesn't have an accurate named error or transform a named error into string,
// so we need to check by error message,
// such as distsql.Checksum which transforms tikv other-error into its own error
var retryableErrorMsgList = []string{
"is not fully replicated",
// for cluster >= 4.x, lightning calls distsql.Checksum to do checksum
// this error happens on when distsql.Checksum calls TiKV
// see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941
"coprocessor task terminated due to exceeding the deadline",
}

func isRetryableFromErrorMessage(err error) bool {
msg := err.Error()
msgLower := strings.ToLower(msg)
for _, errStr := range retryableErrorMsgList {
if strings.Contains(msgLower, errStr) {
return true
}
}
return false
}

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -80,7 +103,9 @@ func isSingleRetryableError(err error) bool {
case *errors.Error:
switch {
case berrors.Is(nerr, ErrKVEpochNotMatch), berrors.Is(nerr, ErrKVNotLeader),
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy):
berrors.Is(nerr, ErrKVRegionNotFound), berrors.Is(nerr, ErrKVServerIsBusy),
berrors.Is(nerr, ErrKVReadIndexNotReady), berrors.Is(nerr, ErrKVIngestFailed),
berrors.Is(nerr, ErrKVRaftProposalDropped):
// common.ErrKVServerIsBusy is a little duplication with tmysql.ErrTiKVServerBusy
// it's because the response of sst.ingest gives us a sst.IngestResponse which doesn't contain error code,
// so we have to transform it into a defined code
Expand All @@ -91,10 +116,8 @@ func isSingleRetryableError(err error) bool {
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
case codes.Unknown:
return false
default:
return false
return isRetryableFromErrorMessage(err)
}
}
}
9 changes: 9 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ func TestIsRetryableError(t *testing.T) {
require.True(t, IsRetryableError(ErrKVEpochNotMatch))
require.True(t, IsRetryableError(ErrKVServerIsBusy))
require.True(t, IsRetryableError(ErrKVRegionNotFound))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady))
require.True(t, IsRetryableError(ErrKVIngestFailed))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped))
require.True(t, IsRetryableError(ErrKVNotLeader.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVEpochNotMatch.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVServerIsBusy.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRegionNotFound.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVReadIndexNotReady.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVIngestFailed.GenWithStack("test")))
require.True(t, IsRetryableError(ErrKVRaftProposalDropped.GenWithStack("test")))

// net: connection refused
_, err := net.Dial("tcp", "localhost:65533")
Expand Down Expand Up @@ -94,4 +100,7 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
require.True(t, IsRetryableError(errors.New("other error: Coprocessor task terminated due to exceeding the deadline")))
}
3 changes: 2 additions & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ func (l *Lightning) handlePostTask(w http.ResponseWriter, req *http.Request) {
writeJSONError(w, http.StatusBadRequest, "cannot read request", err)
return
}
log.L().Info("received task config", zap.ByteString("content", data))
filteredData := utils.HideSensitive(string(data))
log.L().Info("received task config", zap.String("content", filteredData))

cfg := config.NewConfig()
if err = cfg.LoadFromGlobal(l.globalCfg); err != nil {
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,10 @@ func NewParquetParser(

columns := make([]string, 0, len(reader.Footer.Schema)-1)
columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1)
for _, c := range reader.SchemaHandler.SchemaElements {
for i, c := range reader.SchemaHandler.SchemaElements {
if c.GetNumChildren() == 0 {
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
// we need to use the raw name, SchemaElement.Name might be prefixed with PARGO_PERFIX_
columns = append(columns, strings.ToLower(reader.SchemaHandler.GetExName(i)))
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
Expand Down
Loading

0 comments on commit 0744ca9

Please sign in to comment.