Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into test-fix-ut-creat…
Browse files Browse the repository at this point in the history
…e-drop-create
  • Loading branch information
tangenta committed Feb 19, 2024
2 parents 93082a4 + e3e0f7e commit 68af0d4
Show file tree
Hide file tree
Showing 102 changed files with 1,609 additions and 1,046 deletions.
31 changes: 30 additions & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ type Pairs struct {
MemBuf *MemBuf
}

// GroupedPairs is a map from index ID to KvPairs.
type GroupedPairs map[int64][]common.KvPair

// SplitIntoChunks implements the encode.Rows interface. It just satisfies the
// type system and should never be called.
func (GroupedPairs) SplitIntoChunks(int) []encode.Rows {
panic("not implemented")
}

// Clear implements the encode.Rows interface. It just satisfies the type system
// and should never be called.
func (GroupedPairs) Clear() encode.Rows {
panic("not implemented")
}

// MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is
// mainly used for testing only. The resulting Rows instance should only be used
// for the importer backend.
Expand All @@ -171,7 +186,21 @@ func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row {
// back into a slice of KvPair. This method panics if the Rows is not
// constructed in such way.
func Rows2KvPairs(rows encode.Rows) []common.KvPair {
return rows.(*Pairs).Pairs
switch v := rows.(type) {
case *Pairs:
return v.Pairs
case GroupedPairs:
cnt := 0
for _, pairs := range v {
cnt += len(pairs)
}
res := make([]common.KvPair, 0, cnt)
for _, pairs := range v {
res = append(res, pairs...)
}
return res
}
panic(fmt.Sprintf("unknown Rows type %T", rows))
}

// Row2KvPairs converts a Row instance constructed from MakeRowFromKvPairs
Expand Down
3 changes: 0 additions & 3 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ go_library(
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/compress",
Expand All @@ -76,7 +74,6 @@ go_library(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
117 changes: 8 additions & 109 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"math"
"slices"
"sync"

"github.com/cockroachdb/pebble"
Expand All @@ -43,20 +42,16 @@ import (
"github.com/pingcap/tidb/pkg/distsql"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/ranger"
tikverror "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -1004,29 +999,14 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
}()

switch algorithm {
case config.DupeResAlgRecord, config.DupeResAlgNone:
case config.DupeResAlgNone:
logger.Warn("skipping resolution due to selected algorithm. this table will become inconsistent!", zap.String("category", "resolve-dupe"), zap.Stringer("algorithm", algorithm))
return nil
case config.DupeResAlgRemove:
case config.DupeResAlgReplace:
case config.DupeResAlgReplace, config.DupeResAlgErr:
default:
panic(fmt.Sprintf("[resolve-dupe] unknown resolution algorithm %v", algorithm))
}

// TODO: reuse the *kv.SessionOptions from NewEncoder for picking the correct time zone.
decoder, err := kv.NewTableKVDecoder(tbl, tableName, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
}, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}

tableIDs := physicalTableIDs(tbl.Meta())
keyInTable := func(key []byte) bool {
return slices.Contains(tableIDs, tablecodec.DecodeTableID(key))
}

errLimiter := rate.NewLimiter(1, 1)
pool := utils.NewWorkerPool(uint(local.dupeConcurrency), "resolve duplicate rows")

tblInfo, err := json.Marshal(tbl.Meta())
Expand All @@ -1037,31 +1017,6 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
zap.ByteString("tblInfo", tblInfo))

switch algorithm {
case config.DupeResAlgRemove:
err = local.errorMgr.RemoveAllConflictKeys(
ctx, tableName, pool,
func(ctx context.Context, handleRows [][2][]byte) error {
for {
err := local.deleteDuplicateRows(ctx, logger, handleRows, decoder, keyInTable)
if err == nil {
return nil
}
if types.ErrBadNumber.Equal(err) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
}
if log.IsContextCanceledError(err) {
return errors.Trace(err)
}
if !tikverror.IsErrWriteConflict(errors.Cause(err)) {
logger.Warn("delete duplicate rows encounter error", log.ShortError(errors.Trace(err)))
}
if err = errLimiter.Wait(ctx); err != nil {
return errors.Trace(err)
}
}
},
)
case config.DupeResAlgReplace:
err = local.errorMgr.ReplaceConflictKeys(
ctx, tbl, tableName, pool,
Expand All @@ -1075,12 +1030,16 @@ func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table
func(ctx context.Context, key []byte) error {
err := local.deleteDuplicateRow(ctx, logger, key)
if err != nil {
logger.Debug("delete duplicate rows encounter error", zap.Error(err))
return errors.Trace(err)
logger.Warn("delete duplicate rows encounter error", log.ShortError(err))
return common.ErrResolveDuplicateRows.Wrap(errors.Trace(err)).GenWithStackByArgs(tableName)
}
return nil
},
)
case config.DupeResAlgErr:
err = local.errorMgr.ResolveConflictKeysError(
ctx, tableName,
)
}

return errors.Trace(err)
Expand Down Expand Up @@ -1130,63 +1089,3 @@ func (local *DupeController) deleteDuplicateRow(

return errors.Trace(err)
}

func (local *DupeController) deleteDuplicateRows(
ctx context.Context,
logger *log.Task,
handleRows [][2][]byte,
decoder *kv.TableKVDecoder,
keyInTable func(key []byte) bool,
) (err error) {
// Starts a Delete transaction.
txn, err := local.tikvCli.Begin()
if err != nil {
return errors.Trace(err)
}
defer func() {
if err == nil {
err = txn.Commit(ctx)
} else {
if rollbackErr := txn.Rollback(); rollbackErr != nil {
logger.Warn("failed to rollback transaction", zap.Error(rollbackErr))
}
}
}()

deleteKey := func(key []byte) error {
logger.Debug("will delete key", zap.String("category", "resolve-dupe"), logutil.Key("key", key))
return txn.Delete(key)
}

// Collect all rows & index keys into the deletion transaction.
// (if the number of duplicates is small this should fit entirely in memory)
// (Txn's MemBuf's bufferSizeLimit is currently infinity)
for _, handleRow := range handleRows {
// Skip the row key if it's not in the table.
// This can happen if the table has been recreated or truncated,
// and the duplicate key is from the old table.
if !keyInTable(handleRow[0]) {
continue
}
logger.Debug("found row to resolve", zap.String("category", "resolve-dupe"),
logutil.Key("handle", handleRow[0]),
logutil.Key("row", handleRow[1]))

if err := deleteKey(handleRow[0]); err != nil {
return errors.Trace(err)
}

handle, err := decoder.DecodeHandleFromRowKey(handleRow[0])
if err != nil {
return errors.Trace(err)
}

err = decoder.IterRawIndexKeys(handle, handleRow[1], deleteKey)
if err != nil {
return errors.Trace(err)
}
}

logger.Debug("number of KV pairs to be deleted", zap.String("category", "resolve-dupe"), zap.Int("count", txn.Len()))
return nil
}
37 changes: 21 additions & 16 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,12 @@ func (c *BackendConfig) adjust() {

// Backend is a local backend.
type Backend struct {
pdCli pd.Client
pdHTTPCli pdhttp.Client
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
regionSizeGetter TableRegionSizeGetter
tikvCodec tikvclient.Codec
pdCli pd.Client
pdHTTPCli pdhttp.Client
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
tikvCodec tikvclient.Codec

BackendConfig
engineMgr *engineManager
Expand Down Expand Up @@ -500,10 +499,17 @@ func NewBackend(
ctx context.Context,
tls *common.TLS,
config BackendConfig,
regionSizeGetter TableRegionSizeGetter,
pdSvcDiscovery pd.ServiceDiscovery,
) (b *Backend, err error) {
config.adjust()
pdAddrs := strings.Split(config.PDAddr, ",")
var pdAddrs []string
if pdSvcDiscovery != nil {
pdAddrs = pdSvcDiscovery.GetServiceURLs()
// TODO(lance6716): if PD client can support creating a client with external
// service discovery, we can directly pass pdSvcDiscovery.
} else {
pdAddrs = strings.Split(config.PDAddr, ",")
}
pdCli, err := pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
Expand Down Expand Up @@ -552,13 +558,12 @@ func NewBackend(
writeLimiter = noopStoreWriteLimiter{}
}
local := &Backend{
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
regionSizeGetter: regionSizeGetter,
tikvCodec: tikvCodec,
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
tikvCodec: tikvCodec,

BackendConfig: config,

Expand Down
46 changes: 0 additions & 46 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package local
import (
"bytes"
"context"
"database/sql"
"math"
"slices"
"sort"
Expand Down Expand Up @@ -62,51 +61,6 @@ var (
splitRetryTimes = 8
)

// TableRegionSizeGetter get table region size.
type TableRegionSizeGetter interface {
GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error)
}

// TableRegionSizeGetterImpl implements TableRegionSizeGetter.
type TableRegionSizeGetterImpl struct {
DB *sql.DB
}

var _ TableRegionSizeGetter = &TableRegionSizeGetterImpl{}

// GetTableRegionSize implements TableRegionSizeGetter.
func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error) {
if g.DB == nil {
return nil, errors.Errorf("db is nil")
}
exec := &common.SQLWithRetry{
DB: g.DB,
Logger: log.FromContext(ctx),
}

stats := make(map[uint64]int64)
err := exec.Transact(ctx, "fetch region approximate sizes", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, "SELECT REGION_ID, APPROXIMATE_SIZE FROM information_schema.TIKV_REGION_STATUS WHERE TABLE_ID = ?", tableID)
if err != nil {
return errors.Trace(err)
}
//nolint: errcheck
defer rows.Close()
var (
regionID uint64
size int64
)
for rows.Next() {
if err = rows.Scan(&regionID, &size); err != nil {
return errors.Trace(err)
}
stats[regionID] = size * units.MiB
}
return rows.Err()
})
return stats, errors.Trace(err)
}

// SplitAndScatterRegionInBatches splits&scatter regions in batches.
// Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (local *Backend) SplitAndScatterRegionInBatches(
Expand Down
15 changes: 6 additions & 9 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,8 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie
keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")}
client := initTestSplitClient(keys, hook)
local := &Backend{
splitCli: client,
regionSizeGetter: &TableRegionSizeGetterImpl{},
logger: log.L(),
splitCli: client,
logger: log.L(),
}
local.RegionSplitBatchSize = 4
local.RegionSplitConcurrency = 4
Expand Down Expand Up @@ -721,9 +720,8 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) {
keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")}
client := initTestSplitClient(keys, nil)
local := &Backend{
splitCli: client,
regionSizeGetter: &TableRegionSizeGetterImpl{},
logger: log.L(),
splitCli: client,
logger: log.L(),
}
local.RegionSplitBatchSize = 4
local.RegionSplitConcurrency = 4
Expand Down Expand Up @@ -807,9 +805,8 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) {
keys = append(keys, tableEndKey, []byte(""))
client := initTestSplitClient(keys, hook)
local := &Backend{
splitCli: client,
regionSizeGetter: &TableRegionSizeGetterImpl{},
logger: log.L(),
splitCli: client,
logger: log.L(),
}
local.RegionSplitBatchSize = 10
local.RegionSplitConcurrency = 10
Expand Down
Loading

0 comments on commit 68af0d4

Please sign in to comment.