Skip to content

Commit

Permalink
dep(*): update tidb to include DM SchemaTracker fix (#11410)
Browse files Browse the repository at this point in the history
close #11408
  • Loading branch information
lance6716 authored Jul 22, 2024
1 parent 482fd8e commit 2fcbb31
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 40 deletions.
12 changes: 7 additions & 5 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,12 @@ func (tr *Tracker) GetTableInfo(table *filter.Table) (*model.TableInfo, error) {
if tr.closed.Load() {
return nil, dmterror.ErrSchemaTrackerIsClosed.New("fail to get table info")
}
return tr.upstreamTracker.TableByName(model.NewCIStr(table.Schema), model.NewCIStr(table.Name))
return tr.upstreamTracker.TableByName(context.Background(), model.NewCIStr(table.Schema), model.NewCIStr(table.Name))
}

// GetCreateTable returns the `CREATE TABLE` statement of the table.
func (tr *Tracker) GetCreateTable(ctx context.Context, table *filter.Table) (string, error) {
tableInfo, err := tr.upstreamTracker.TableByName(model.NewCIStr(table.Schema), model.NewCIStr(table.Name))
tableInfo, err := tr.upstreamTracker.TableByName(ctx, model.NewCIStr(table.Schema), model.NewCIStr(table.Name))
if err != nil {
return "", err
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (tr *Tracker) ListSchemaTables(schema string) ([]string, error) {
// TODO: move out of this package!
func (tr *Tracker) GetSingleColumnIndices(db, tbl, col string) ([]*model.IndexInfo, error) {
col = strings.ToLower(col)
t, err := tr.upstreamTracker.TableByName(model.NewCIStr(db), model.NewCIStr(tbl))
t, err := tr.upstreamTracker.TableByName(context.Background(), model.NewCIStr(db), model.NewCIStr(tbl))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,13 +339,15 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn
tableName := model.NewCIStr(table.Name)
ti = cloneTableInfo(ti)
ti.Name = tableName
return tr.upstreamTracker.CreateTableWithInfo(tr.se, schemaName, ti, nil, ddl.OnExistIgnore)
return tr.upstreamTracker.CreateTableWithInfo(tr.se, schemaName, ti, nil, ddl.WithOnExist(ddl.OnExistIgnore))
}

// SplitBatchCreateTableAndHandle will split the batch if it exceeds the kv entry size limit.
func (tr *Tracker) SplitBatchCreateTableAndHandle(schema model.CIStr, info []*model.TableInfo, l int, r int) error {
var err error
if err = tr.upstreamTracker.BatchCreateTableWithInfo(tr.se, schema, info[l:r], ddl.OnExistIgnore); kv.ErrEntryTooLarge.Equal(err) {
if err = tr.upstreamTracker.BatchCreateTableWithInfo(
tr.se, schema, info[l:r], ddl.WithOnExist(ddl.OnExistIgnore),
); kv.ErrEntryTooLarge.Equal(err) {
if r-l == 1 {
return err
}
Expand Down
1 change: 0 additions & 1 deletion dm/syncer/expr_filter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func (g *ExprFilterGroup) ResetExprs(table *filter.Table) {
// SkipDMLByExpression returns true when given row matches the expr, which means this row should be skipped.
func SkipDMLByExpression(ctx sessionctx.Context, row []interface{}, expr expression.Expression, upstreamCols []*model.ColumnInfo) (bool, error) {
// TODO: add MetricsProxies
log.L().Debug("will evaluate the expression", zap.Stringer("expression", expr), zap.Any("raw row", row))
data, err := utils.AdjustBinaryProtocolForDatum(ctx, row, upstreamCols)
if err != nil {
return false, err
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/expr_filter_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

ddl2 "github.com/pingcap/tidb/pkg/ddl"
context2 "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -439,7 +440,7 @@ create table t (
require.NoError(t, err)
require.Len(t, exprs, 1)
expr := exprs[0]
require.Equal(t, "0", expr.String())
require.Equal(t, "0", expr.StringWithCtx(context2.EmptyParamValues))

// skip nothing
skip, err := SkipDMLByExpression(sessCtx, []interface{}{0}, expr, ti.Columns)
Expand Down
23 changes: 12 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ require (
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/tidb v1.1.0-beta.0.20240627074325-184b010f800a
github.com/pingcap/tidb v1.1.0-beta.0.20240722024203-504960d51b2a
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
github.com/pingcap/tidb/pkg/parser v0.0.0-20240627074325-184b010f800a
github.com/prometheus/client_golang v1.19.0
github.com/pingcap/tidb/pkg/parser v0.0.0-20240722015532-8edd4ed54376
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -89,9 +89,9 @@ require (
github.com/swaggo/swag v1.16.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20240626064248-4a72526f6c30
github.com/tikv/client-go/v2 v2.0.8-0.20240703095801-d73cc1ed6503
github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b
github.com/tikv/pd/client v0.0.0-20240620115049-049de1761e56
github.com/tikv/pd/client v0.0.0-20240717053728-5ec6af403019
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -111,16 +111,16 @@ require (
go.uber.org/ratelimit v0.2.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8
golang.org/x/net v0.26.0
golang.org/x/net v0.27.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.21.0
golang.org/x/sys v0.22.0
golang.org/x/text v0.16.0
golang.org/x/time v0.5.0
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v2 v2.4.0
gorm.io/driver/mysql v1.4.5
gorm.io/gorm v1.24.5
Expand Down Expand Up @@ -174,6 +174,7 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/otiai10/copy v1.2.0 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/qri-io/jsonschema v0.2.1 // indirect
Expand Down Expand Up @@ -332,7 +333,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand Down Expand Up @@ -379,9 +380,9 @@ require (
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/term v0.21.0
golang.org/x/term v0.22.0
golang.org/x/tools v0.22.0 // indirect
google.golang.org/api v0.170.0 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
Expand Down
Loading

0 comments on commit 2fcbb31

Please sign in to comment.