Skip to content

Commit

Permalink
Merge branch 'release-5.2' into batch-cherrypick-defaultvalue
Browse files Browse the repository at this point in the history
  • Loading branch information
maxshuang authored Apr 14, 2022
2 parents a550c17 + 96dc222 commit 418daf4
Show file tree
Hide file tree
Showing 49 changed files with 406 additions and 237 deletions.
11 changes: 9 additions & 2 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -219,6 +221,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
6 changes: 2 additions & 4 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (

func Test(t *testing.T) { check.TestingT(t) }

type codecSuite struct {
}
type codecSuite struct{}

var _ = check.Suite(&codecSuite{})

Expand All @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) {
c.Assert(len(key), check.Equals, 0)
}

type decodeMetaKeySuite struct {
}
type decodeMetaKeySuite struct{}

var _ = check.Suite(&decodeMetaKeySuite{})

Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func testMounterDisableOldValue(c *check.C, tc struct {
tableName string
createTableDDL string
values [][]interface{}
}) {
},
) {
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)
defer store.Close() //nolint:errcheck
Expand Down
52 changes: 32 additions & 20 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type schemaSnapshot struct {

currentTs uint64

// if explicit is true, treat tables without explicit row id as eligible
explicitTables bool
// if forceReplicate is true, treat ineligible tables as eligible.
forceReplicate bool
}

// SingleSchemaSnapshot is a single schema snapshot independent of schema storage
Expand Down Expand Up @@ -98,17 +98,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo,
}

// NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) {
func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) {
// meta is nil only in unit tests
if meta == nil {
snap := newEmptySchemaSnapshot(explicitTables)
snap := newEmptySchemaSnapshot(forceReplicate)
snap.currentTs = currentTs
return snap, nil
}
return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables)
return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate)
}

func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot {
return &schemaSnapshot{
tableNameToID: make(map[model.TableName]int64),
schemaNameToID: make(map[string]int64),
Expand All @@ -121,12 +121,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot {
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),

explicitTables: explicitTables,
forceReplicate: forceReplicate,
}
}

func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(explicitTables)
func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) {
snap := newEmptySchemaSnapshot(forceReplicate)
dbinfos, err := meta.ListDatabases()
if err != nil {
return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err)
Expand All @@ -146,7 +146,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl
tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo)
snap.tables[tableInfo.ID] = tableInfo
snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID
isEligible := tableInfo.IsEligible(explicitTables)
isEligible := tableInfo.IsEligible(forceReplicate)
if !isEligible {
snap.ineligibleTableID[tableInfo.ID] = struct{}{}
}
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error {
zap.Int64("add partition id", partition.ID))
}
s.partitionTable[partition.ID] = tbl
if !tbl.IsEligible(s.explicitTables) {
if !tbl.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
delete(oldIDs, partition.ID)
Expand Down Expand Up @@ -504,14 +504,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error {
s.tableInSchema[table.SchemaID] = tableInSchema

s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand All @@ -529,14 +535,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error {
return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID)
}
s.tables[table.ID] = table
if !table.IsEligible(s.explicitTables) {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
if !table.IsEligible(s.forceReplicate) {
// Sequence is not supported yet, and always ineligible.
// Skip Warn to avoid confusion.
// See https://github.com/pingcap/tiflow/issues/4559
if !table.IsSequence() {
log.Warn("this table is ineligible to replicate",
zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
}
s.ineligibleTableID[table.ID] = struct{}{}
}
if pi := table.GetPartitionInfo(); pi != nil {
for _, partition := range pi.Definitions {
s.partitionTable[partition.ID] = table
if !table.IsEligible(s.explicitTables) {
if !table.IsEligible(s.forceReplicate) {
s.ineligibleTableID[partition.ID] = struct{}{}
}
}
Expand Down Expand Up @@ -673,7 +685,7 @@ type schemaStorageImpl struct {
resolvedTs uint64

filter *filter.Filter
explicitTables bool
forceReplicate bool
}

// NewSchemaStorage creates a new schema storage
Expand All @@ -692,7 +704,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter,
snaps: []*schemaSnapshot{snap},
resolvedTs: startTs,
filter: filter,
explicitTables: forceReplicate,
forceReplicate: forceReplicate,
}
return schema, nil
}
Expand Down Expand Up @@ -769,7 +781,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
}
snap = lastSnap.Clone()
} else {
snap = newEmptySchemaSnapshot(s.explicitTables)
snap = newEmptySchemaSnapshot(s.forceReplicate)
}
if err := snap.handleDDL(job); err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (t *schemaSuite) TestSnapshotClone(c *check.C) {
c.Assert(clone.truncateTableID, check.DeepEquals, snap.truncateTableID)
c.Assert(clone.ineligibleTableID, check.DeepEquals, snap.ineligibleTableID)
c.Assert(clone.currentTs, check.Equals, snap.currentTs)
c.Assert(clone.explicitTables, check.Equals, snap.explicitTables)
c.Assert(clone.forceReplicate, check.Equals, snap.forceReplicate)
c.Assert(len(clone.tables), check.Equals, len(snap.tables))
c.Assert(len(clone.schemas), check.Equals, len(snap.schemas))
c.Assert(len(clone.partitionTable), check.Equals, len(snap.partitionTable))
Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func Test(t *testing.T) {
check.TestingT(t)
}

type clientSuite struct {
}
type clientSuite struct{}

var _ = check.Suite(&clientSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type rtsHeapSuite struct {
}
type rtsHeapSuite struct{}

var _ = check.Suite(&rtsHeapSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/token_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
"golang.org/x/sync/errgroup"
)

type tokenRegionSuite struct {
}
type tokenRegionSuite struct{}

var _ = check.Suite(&tokenRegionSuite{})

Expand Down
1 change: 0 additions & 1 deletion cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ var _ = check.Suite(&taskStatusSuite{})
func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) {
defer testleak.AfterTest(c)()
info := TaskStatus{

Tables: map[TableID]*TableReplicaInfo{
1: {StartTs: 100},
2: {StartTs: 100},
Expand Down
5 changes: 5 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool {

// IsEligible returns whether the table is a eligible table
func (ti *TableInfo) IsEligible(forceReplicate bool) bool {
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if ti.IsSequence() {
return false
}
if forceReplicate {
return true
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,19 @@ func (s *schemaStorageSuite) TestTableInfoGetterFuncs(c *check.C) {
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsEligible(false), check.IsFalse)
c.Assert(info.IsEligible(true), check.IsTrue)

// View is eligible.
t.View = &timodel.ViewInfo{}
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsView(), check.IsTrue)
c.Assert(info.IsEligible(false), check.IsTrue)

// Sequence is ineligible.
t.Sequence = &timodel.SequenceInfo{}
info = WrapTableInfo(1, "test", 0, &t)
c.Assert(info.IsSequence(), check.IsTrue)
c.Assert(info.IsEligible(false), check.IsFalse)
c.Assert(info.IsEligible(true), check.IsFalse)
}

func (s *schemaStorageSuite) TestTableInfoClone(c *check.C) {
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&barrierSuite{})

type barrierSuite struct {
}
type barrierSuite struct{}

func (s *barrierSuite) TestBarrier(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {

var _ = check.Suite(&changefeedSuite{})

type changefeedSuite struct {
}
type changefeedSuite struct{}

func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) {
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
ctx.GlobalVars().PDClient = &gc.MockPDClient{
UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (

var _ = check.Suite(&ddlPullerSuite{})

type ddlPullerSuite struct {
}
type ddlPullerSuite struct{}

type mockPuller struct {
c *check.C
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var _ = check.Suite(&feedStateManagerSuite{})

type feedStateManagerSuite struct {
}
type feedStateManagerSuite struct{}

func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
Loading

0 comments on commit 418daf4

Please sign in to comment.