Skip to content

Commit

Permalink
Merge branch 'master' into max_connections
Browse files Browse the repository at this point in the history
  • Loading branch information
CbcWestwolf authored Jun 22, 2022
2 parents d672462 + 494759d commit a975923
Show file tree
Hide file tree
Showing 58 changed files with 576 additions and 2,176 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The [community repository](https://github.com/pingcap/community) hosts all infor

[<img src="docs/contribution-map.png" alt="contribution-map" width="180">](https://github.com/pingcap/tidb-map/blob/master/maps/contribution-map.md#tidb-is-an-open-source-distributed-htap-database-compatible-with-the-mysql-protocol)

Contributions are welcomed and greatly appreciated. See [Contribution to TiDB](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/introduction.html) for details on typical contribution workflows. For more contributing information, click on the contributor icon above.
Contributions are welcomed and greatly appreciated. All the contributors are welcomed to claim your reward by filing this [form](https://forms.pingcap.com/f/tidb-contribution-swag). See [Contribution to TiDB](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/introduction.html) for details on typical contribution workflows. For more contributing information, click on the contributor icon above.

## Adopters

Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

func makeLogger(tag string, engineUUID uuid.UUID) log.Logger {
return log.With(
func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger {
return logger.With(
zap.String("engineTag", tag),
zap.Stringer("engineUUID", engineUUID),
)
Expand Down Expand Up @@ -143,7 +143,7 @@ type AbstractBackend interface {
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -260,8 +260,8 @@ func (be Backend) MakeEmptyRows() kv.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(tbl, options)
func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(ctx, tbl, options)
}

func (be Backend) ShouldPostProcess() bool {
Expand Down Expand Up @@ -321,7 +321,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger("<import-and-reset>", engineUUID),
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
Expand All @@ -334,7 +334,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(tag, engineUUID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
return nil, err
Expand Down Expand Up @@ -437,7 +437,7 @@ func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tabl
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
logger: makeLogger(tag, engineUUID),
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
}.unsafeClose(ctx, cfg)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ func TestNewEncoder(t *testing.T) {

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)
s.mockBackend.EXPECT().NewEncoder(nil, nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, options)
realEncoder, err := s.mockBackend.NewEncoder(nil, nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"fmt"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -104,8 +105,13 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
return nil
}

func NewTableKVDecoder(tbl table.Table, tableName string, options *SessionOptions) (*TableKVDecoder, error) {
se := newSession(options)
func NewTableKVDecoder(
tbl table.Table,
tableName string,
options *SessionOptions,
logger log.Logger,
) (*TableKVDecoder, error) {
se := newSession(options, logger)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ type SessionOptions struct {
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *SessionOptions) sessionctx.Context {
return newSession(options)
func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
return newSession(options, logger)
}

func newSession(options *SessionOptions) *session {
func newSession(options *SessionOptions, logger log.Logger) *session {
sqlMode := options.SQLMode
vars := variable.NewSessionVars()
vars.SkipUTF8Check = true
Expand All @@ -265,15 +265,15 @@ func newSession(options *SessionOptions) *session {
if options.SysVars != nil {
for k, v := range options.SysVars {
if err := vars.SetSystemVar(k, v); err != nil {
log.L().DPanic("new session: failed to set system var",
logger.DPanic("new session: failed to set system var",
log.ShortError(err),
zap.String("key", k))
}
}
}
vars.StmtCtx.TimeZone = vars.Location()
if err := vars.SetSystemVar("timestamp", strconv.FormatInt(options.Timestamp, 10)); err != nil {
log.L().Warn("new session: failed to set timestamp",
logger.Warn("new session: failed to set timestamp",
log.ShortError(err))
}
vars.TxnCtx = nil
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package kv
import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}
11 changes: 8 additions & 3 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ type tableKVEncoder struct {
metrics *metric.Metrics
}

func NewTableKVEncoder(tbl table.Table, options *SessionOptions, metrics *metric.Metrics) (Encoder, error) {
func NewTableKVEncoder(
tbl table.Table,
options *SessionOptions,
metrics *metric.Metrics,
logger log.Logger,
) (Encoder, error) {
if metrics != nil {
metrics.KvEncoderCounter.WithLabelValues("open").Inc()
}
meta := tbl.Meta()
cols := tbl.Cols()
se := newSession(options)
se := newSession(options, logger)
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)
Expand Down Expand Up @@ -267,7 +272,7 @@ func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *mo
log.ShortError(err),
)

log.L().Error("failed to covert kv value", logutil.RedactAny("origVal", original.GetValue()),
logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
return errors.Annotatef(
Expand Down
26 changes: 13 additions & 13 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestEncode(t *testing.T) {
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
}, nil)
}, nil, logger)
require.NoError(t, err)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.Regexp(t, "failed to cast value as tinyint\\(4\\) for column `c1` \\(#1\\):.*overflows tinyint", err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestEncode(t *testing.T) {
mockMode, err := NewTableKVEncoder(mockTbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567891,
}, nil)
}, nil, logger)
require.NoError(t, err)
_, err = mockMode.Encode(logger, rowsWithPk2, 2, []int{0, 1}, "1.csv", 1234)
require.EqualError(t, err, "mock error")
Expand All @@ -131,7 +131,7 @@ func TestEncode(t *testing.T) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
SysVars: map[string]string{"tidb_row_format_version": "1"},
}, nil)
}, nil, logger)
require.NoError(t, err)
pairs, err = noneMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand All @@ -153,7 +153,7 @@ func TestDecode(t *testing.T) {
decoder, err := NewTableKVDecoder(tbl, "`test`.`c1`", &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
}, log.L())
require.NoError(t, err)
require.NotNil(t, decoder)
require.Equal(t, decoder.Name(), "`test`.`c1`")
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestDecodeIndex(t *testing.T) {
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
}, nil)
}, nil, log.L())
require.NoError(t, err)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1, -1}, "1.csv", 123)
data := pairs.(*KvPairs)
Expand All @@ -217,7 +217,7 @@ func TestDecodeIndex(t *testing.T) {
decoder, err := NewTableKVDecoder(tbl, "`test`.``", &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
}, log.L())
require.NoError(t, err)
h1, err := decoder.DecodeHandleFromRowKey(data.pairs[0].Key)
require.NoError(t, err)
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
SQLMode: mysql.ModeNone,
Timestamp: 1234567892,
SysVars: map[string]string{"tidb_row_format_version": "2"},
}, nil)
}, nil, log.L())
require.NoError(t, err)
pairs, err := noneMode.Encode(logger, rows, 1, []int{0, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestEncodeTimestamp(t *testing.T) {
"tidb_row_format_version": "1",
"time_zone": "+08:00",
},
}, nil)
}, nil, log.L())
require.NoError(t, err)
pairs, err := encoder.Encode(logger, nil, 70, []int{-1, 1}, "1.csv", 1234)
require.NoError(t, err)
Expand All @@ -320,7 +320,7 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
}, nil)
}, nil, log.L())
require.NoError(t, err)

strDatumForID := types.NewStringDatum("1")
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
}, nil)
}, nil, log.L())
require.NoError(t, err)

realRowID := encoder.(*tableKVEncoder).autoIDFn(rowID)
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestDefaultAutoRandoms(t *testing.T) {
Timestamp: 1234567893,
SysVars: map[string]string{"tidb_row_format_version": "2"},
AutoRandomSeed: 456,
}, nil)
}, nil, log.L())
require.NoError(t, err)
logger := log.Logger{Logger: zap.NewNop()}
pairs, err := encoder.Encode(logger, []types.Datum{types.NewStringDatum("")}, 70, []int{-1, 0}, "1.csv", 1234)
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestShardRowId(t *testing.T) {
Timestamp: 1234567893,
SysVars: map[string]string{"tidb_row_format_version": "2"},
AutoRandomSeed: 456,
}, nil)
}, nil, log.L())
require.NoError(t, err)
logger := log.Logger{Logger: zap.NewNop()}
keyMap := make(map[int64]struct{}, 16)
Expand Down Expand Up @@ -636,7 +636,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite {
// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tableInfo)
require.NoError(b, err)
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}}, nil)
encoder, err := NewTableKVEncoder(tbl, &SessionOptions{SysVars: map[string]string{"tidb_row_format_version": "2"}}, nil, log.L())
require.NoError(b, err)
logger := log.Logger{Logger: zap.NewNop()}

Expand Down
5 changes: 3 additions & 2 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,13 @@ func NewDuplicateManager(
sessOpts *kv.SessionOptions,
concurrency int,
hasDupe *atomic.Bool,
logger log.Logger,
) (*DuplicateManager, error) {
decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts)
logger = logger.With(zap.String("tableName", tableName))
decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts, logger)
if err != nil {
return nil, errors.Trace(err)
}
logger := log.With(zap.String("tableName", tableName))
return &DuplicateManager{
tbl: tbl,
tableName: tableName,
Expand Down
Loading

0 comments on commit a975923

Please sign in to comment.