diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go index 3e2b60b825d..66b3a24edfc 100644 --- a/cdc/capture/http_validator.go +++ b/cdc/capture/http_validator.go @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch } if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable { - ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS) + ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS) if err != nil { return nil, err } @@ -201,7 +201,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch return newInfo, nil } -func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) { +// VerifyTables catalog tables specified by ReplicaConfig into +// eligible (has an unique index or primary key) and ineligible tables. +func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) { filter, err := filter.NewFilter(replicaConfig) if err != nil { return nil, nil, errors.Trace(err) @@ -219,6 +221,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { continue } + // Sequence is not supported yet, TiCDC needs to filter all sequence tables. + // See https://github.com/pingcap/tiflow/issues/4559 + if tableInfo.IsSequence() { + continue + } if !tableInfo.IsEligible(false /* forceReplicate */) { ineligibleTables = append(ineligibleTables, tableInfo.TableName) } else { diff --git a/cdc/entry/codec_test.go b/cdc/entry/codec_test.go index 4b8ceffd4f7..eb04638b218 100644 --- a/cdc/entry/codec_test.go +++ b/cdc/entry/codec_test.go @@ -25,8 +25,7 @@ import ( func Test(t *testing.T) { check.TestingT(t) } -type codecSuite struct { -} +type codecSuite struct{} var _ = check.Suite(&codecSuite{}) @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) { c.Assert(len(key), check.Equals, 0) } -type decodeMetaKeySuite struct { -} +type decodeMetaKeySuite struct{} var _ = check.Suite(&decodeMetaKeySuite{}) diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 1fd61e19efb..79f66111f30 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -220,7 +220,8 @@ func testMounterDisableOldValue(c *check.C, tc struct { tableName string createTableDDL string values [][]interface{} -}) { +}, +) { store, err := mockstore.NewMockStore() c.Assert(err, check.IsNil) defer store.Close() //nolint:errcheck diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index a23cb651bec..cd039bcc21d 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -50,8 +50,8 @@ type schemaSnapshot struct { currentTs uint64 - // if explicit is true, treat tables without explicit row id as eligible - explicitTables bool + // if forceReplicate is true, treat ineligible tables as eligible. + forceReplicate bool } // SingleSchemaSnapshot is a single schema snapshot independent of schema storage @@ -98,17 +98,17 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, } // NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta -func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) { +func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*SingleSchemaSnapshot, error) { // meta is nil only in unit tests if meta == nil { - snap := newEmptySchemaSnapshot(explicitTables) + snap := newEmptySchemaSnapshot(forceReplicate) snap.currentTs = currentTs return snap, nil } - return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables) + return newSchemaSnapshotFromMeta(meta, currentTs, forceReplicate) } -func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot { +func newEmptySchemaSnapshot(forceReplicate bool) *schemaSnapshot { return &schemaSnapshot{ tableNameToID: make(map[model.TableName]int64), schemaNameToID: make(map[string]int64), @@ -121,12 +121,12 @@ func newEmptySchemaSnapshot(explicitTables bool) *schemaSnapshot { truncateTableID: make(map[int64]struct{}), ineligibleTableID: make(map[int64]struct{}), - explicitTables: explicitTables, + forceReplicate: forceReplicate, } } -func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*schemaSnapshot, error) { - snap := newEmptySchemaSnapshot(explicitTables) +func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplicate bool) (*schemaSnapshot, error) { + snap := newEmptySchemaSnapshot(forceReplicate) dbinfos, err := meta.ListDatabases() if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) @@ -146,7 +146,7 @@ func newSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTabl tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) snap.tables[tableInfo.ID] = tableInfo snap.tableNameToID[model.TableName{Schema: dbinfo.Name.O, Table: tableInfo.Name.O}] = tableInfo.ID - isEligible := tableInfo.IsEligible(explicitTables) + isEligible := tableInfo.IsEligible(forceReplicate) if !isEligible { snap.ineligibleTableID[tableInfo.ID] = struct{}{} } @@ -468,7 +468,7 @@ func (s *schemaSnapshot) updatePartition(tbl *model.TableInfo) error { zap.Int64("add partition id", partition.ID)) } s.partitionTable[partition.ID] = tbl - if !tbl.IsEligible(s.explicitTables) { + if !tbl.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } delete(oldIDs, partition.ID) @@ -504,14 +504,20 @@ func (s *schemaSnapshot) createTable(table *model.TableInfo) error { s.tableInSchema[table.SchemaID] = tableInSchema s.tables[table.ID] = table - if !table.IsEligible(s.explicitTables) { - log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + if !table.IsEligible(s.forceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !table.IsSequence() { + log.Warn("this table is ineligible to replicate", + zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + } s.ineligibleTableID[table.ID] = struct{}{} } if pi := table.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { s.partitionTable[partition.ID] = table - if !table.IsEligible(s.explicitTables) { + if !table.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } } @@ -529,14 +535,20 @@ func (s *schemaSnapshot) replaceTable(table *model.TableInfo) error { return cerror.ErrSnapshotTableNotFound.GenWithStack("table %s(%d)", table.Name, table.ID) } s.tables[table.ID] = table - if !table.IsEligible(s.explicitTables) { - log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + if !table.IsEligible(s.forceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !table.IsSequence() { + log.Warn("this table is ineligible to replicate", + zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID)) + } s.ineligibleTableID[table.ID] = struct{}{} } if pi := table.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { s.partitionTable[partition.ID] = table - if !table.IsEligible(s.explicitTables) { + if !table.IsEligible(s.forceReplicate) { s.ineligibleTableID[partition.ID] = struct{}{} } } @@ -673,7 +685,7 @@ type schemaStorageImpl struct { resolvedTs uint64 filter *filter.Filter - explicitTables bool + forceReplicate bool } // NewSchemaStorage creates a new schema storage @@ -692,7 +704,7 @@ func NewSchemaStorage(meta *timeta.Meta, startTs uint64, filter *filter.Filter, snaps: []*schemaSnapshot{snap}, resolvedTs: startTs, filter: filter, - explicitTables: forceReplicate, + forceReplicate: forceReplicate, } return schema, nil } @@ -769,7 +781,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error { } snap = lastSnap.Clone() } else { - snap = newEmptySchemaSnapshot(s.explicitTables) + snap = newEmptySchemaSnapshot(s.forceReplicate) } if err := snap.handleDDL(job); err != nil { return errors.Trace(err) diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 23f63c2b4bf..abbd17349af 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -735,7 +735,7 @@ func (t *schemaSuite) TestSnapshotClone(c *check.C) { c.Assert(clone.truncateTableID, check.DeepEquals, snap.truncateTableID) c.Assert(clone.ineligibleTableID, check.DeepEquals, snap.ineligibleTableID) c.Assert(clone.currentTs, check.Equals, snap.currentTs) - c.Assert(clone.explicitTables, check.Equals, snap.explicitTables) + c.Assert(clone.forceReplicate, check.Equals, snap.forceReplicate) c.Assert(len(clone.tables), check.Equals, len(snap.tables)) c.Assert(len(clone.schemas), check.Equals, len(snap.schemas)) c.Assert(len(clone.partitionTable), check.Equals, len(snap.partitionTable)) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index fd5094637f3..d2714f5c6fe 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -62,8 +62,7 @@ func Test(t *testing.T) { check.TestingT(t) } -type clientSuite struct { -} +type clientSuite struct{} var _ = check.Suite(&clientSuite{}) diff --git a/cdc/kv/resolvedts_heap_test.go b/cdc/kv/resolvedts_heap_test.go index 1e3c8fb8a47..4fe92e9b5f1 100644 --- a/cdc/kv/resolvedts_heap_test.go +++ b/cdc/kv/resolvedts_heap_test.go @@ -20,8 +20,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type rtsHeapSuite struct { -} +type rtsHeapSuite struct{} var _ = check.Suite(&rtsHeapSuite{}) diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go index a8d217996ef..a5ad7d5279e 100644 --- a/cdc/kv/token_region_test.go +++ b/cdc/kv/token_region_test.go @@ -26,8 +26,7 @@ import ( "golang.org/x/sync/errgroup" ) -type tokenRegionSuite struct { -} +type tokenRegionSuite struct{} var _ = check.Suite(&tokenRegionSuite{}) diff --git a/cdc/model/owner_test.go b/cdc/model/owner_test.go index d960963cdfe..b70053c3f73 100644 --- a/cdc/model/owner_test.go +++ b/cdc/model/owner_test.go @@ -162,7 +162,6 @@ var _ = check.Suite(&taskStatusSuite{}) func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) { defer testleak.AfterTest(c)() info := TaskStatus{ - Tables: map[TableID]*TableReplicaInfo{ 1: {StartTs: 100}, 2: {StartTs: 100}, diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 909adac9739..d8cf828ef06 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -307,6 +307,11 @@ func (ti *TableInfo) ExistTableUniqueColumn() bool { // IsEligible returns whether the table is a eligible table func (ti *TableInfo) IsEligible(forceReplicate bool) bool { + // Sequence is not supported yet, TiCDC needs to filter all sequence tables. + // See https://github.com/pingcap/tiflow/issues/4559 + if ti.IsSequence() { + return false + } if forceReplicate { return true } diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 5979546c707..13bbddb1058 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -401,9 +401,19 @@ func (s *schemaStorageSuite) TestTableInfoGetterFuncs(c *check.C) { info = WrapTableInfo(1, "test", 0, &t) c.Assert(info.IsEligible(false), check.IsFalse) c.Assert(info.IsEligible(true), check.IsTrue) + + // View is eligible. t.View = &timodel.ViewInfo{} info = WrapTableInfo(1, "test", 0, &t) + c.Assert(info.IsView(), check.IsTrue) c.Assert(info.IsEligible(false), check.IsTrue) + + // Sequence is ineligible. + t.Sequence = &timodel.SequenceInfo{} + info = WrapTableInfo(1, "test", 0, &t) + c.Assert(info.IsSequence(), check.IsTrue) + c.Assert(info.IsEligible(false), check.IsFalse) + c.Assert(info.IsEligible(true), check.IsFalse) } func (s *schemaStorageSuite) TestTableInfoClone(c *check.C) { diff --git a/cdc/owner/barrier_test.go b/cdc/owner/barrier_test.go index 90be548eb0b..5b09cd030ba 100644 --- a/cdc/owner/barrier_test.go +++ b/cdc/owner/barrier_test.go @@ -27,8 +27,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&barrierSuite{}) -type barrierSuite struct { -} +type barrierSuite struct{} func (s *barrierSuite) TestBarrier(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 4ba09e15202..07283a09b58 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -112,11 +112,11 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error { var _ = check.Suite(&changefeedSuite{}) -type changefeedSuite struct { -} +type changefeedSuite struct{} func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState, - map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) { + map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester, +) { ctx.GlobalVars().PDClient = &gc.MockPDClient{ UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { return safePoint, nil diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index f7ec756b63b..4be256a4c79 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -31,8 +31,7 @@ import ( var _ = check.Suite(&ddlPullerSuite{}) -type ddlPullerSuite struct { -} +type ddlPullerSuite struct{} type mockPuller struct { c *check.C diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 4a9ec487745..ae9a8c7671a 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -24,8 +24,7 @@ import ( var _ = check.Suite(&feedStateManagerSuite{}) -type feedStateManagerSuite struct { -} +type feedStateManagerSuite struct{} func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { defer testleak.AfterTest(c)() diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index b1b429eca4f..1795a0a53b5 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -34,8 +34,7 @@ import ( var _ = check.Suite(&ownerSuite{}) -type ownerSuite struct { -} +type ownerSuite struct{} type mockManager struct { gc.Manager @@ -328,22 +327,20 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { tester := orchestrator.NewReactorStateTester(c, state, nil) // no changefeed, the gc safe point should be max uint64 - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1)) - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1)) + return 0, nil + } err := o.updateGCSafepoint(ctx, state) c.Assert(err, check.IsNil) // add a failed changefeed, it must not trigger update GC safepoint. - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - c.Fatal("must not update") - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Fatal("must not update") + return 0, nil + } changefeedID1 := "changefeed-test1" tester.MustUpdate( fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1), @@ -360,15 +357,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { // switch the state of changefeed to normal, it must update GC safepoint to // 1 (checkpoint Ts of changefeed-test1). ch := make(chan struct{}, 1) - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(1)) - c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) - ch <- struct{}{} - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(1)) + c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) + ch <- struct{}{} + return 0, nil + } state.Changefeeds[changefeedID1].PatchInfo( func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { info.State = model.StateNormal @@ -398,15 +394,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { return &model.ChangeFeedStatus{CheckpointTs: 30}, true, nil }) tester.MustApplyPatches() - mockPDClient.UpdateServiceGCSafePointFunc = - func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { - // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, - // set GC safepoint to (checkpointTs - 1) - c.Assert(safePoint, check.Equals, uint64(19)) - c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) - ch <- struct{}{} - return 0, nil - } + mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + // Owner will do a snapshot read at (checkpointTs - 1) from TiKV, + // set GC safepoint to (checkpointTs - 1) + c.Assert(safePoint, check.Equals, uint64(19)) + c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID) + ch <- struct{}{} + return 0, nil + } err = o.updateGCSafepoint(ctx, state) c.Assert(err, check.IsNil) select { diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 9cd467191b1..1ceb9cdef53 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -116,39 +116,9 @@ func (s *schemaWrap4Owner) BuildDDLEvent(job *timodel.Job) (*model.DDLEvent, err return ddlEvent, nil } -func (s *schemaWrap4Owner) SinkTableInfos() []*model.SimpleTableInfo { - var sinkTableInfos []*model.SimpleTableInfo - for tableID := range s.schemaSnapshot.CloneTables() { - tblInfo, ok := s.schemaSnapshot.TableByID(tableID) - if !ok { - log.Panic("table not found for table ID", zap.Int64("tid", tableID)) - } - if s.shouldIgnoreTable(tblInfo) { - continue - } - dbInfo, ok := s.schemaSnapshot.SchemaByTableID(tableID) - if !ok { - log.Panic("schema not found for table ID", zap.Int64("tid", tableID)) - } - - // TODO separate function for initializing SimpleTableInfo - sinkTableInfo := new(model.SimpleTableInfo) - sinkTableInfo.Schema = dbInfo.Name.O - sinkTableInfo.TableID = tableID - sinkTableInfo.Table = tblInfo.TableName.Table - sinkTableInfo.ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols())) - for i, colInfo := range tblInfo.Cols() { - sinkTableInfo.ColumnInfo[i] = new(model.ColumnInfo) - sinkTableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo) - } - sinkTableInfos = append(sinkTableInfos, sinkTableInfo) - } - return sinkTableInfos -} - -func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool { - schemaName := tableInfo.TableName.Schema - tableName := tableInfo.TableName.Table +func (s *schemaWrap4Owner) shouldIgnoreTable(t *model.TableInfo) bool { + schemaName := t.TableName.Schema + tableName := t.TableName.Table if s.filter.ShouldIgnoreTable(schemaName, tableName) { return true } @@ -156,8 +126,14 @@ func (s *schemaWrap4Owner) shouldIgnoreTable(tableInfo *model.TableInfo) bool { // skip the mark table if cyclic is enabled return true } - if !tableInfo.IsEligible(s.config.ForceReplicate) { - log.Warn("skip ineligible table", zap.Int64("tid", tableInfo.ID), zap.Stringer("table", tableInfo.TableName)) + if !t.IsEligible(s.config.ForceReplicate) { + // Sequence is not supported yet, and always ineligible. + // Skip Warn to avoid confusion. + // See https://github.com/pingcap/tiflow/issues/4559 + if !t.IsSequence() { + log.Warn("skip ineligible table", + zap.Int64("tableID", t.ID), zap.Stringer("tableName", t.TableName)) + } return true } return false diff --git a/cdc/owner/schema_test.go b/cdc/owner/schema_test.go index e7386c19d2a..5951daa3964 100644 --- a/cdc/owner/schema_test.go +++ b/cdc/owner/schema_test.go @@ -28,8 +28,7 @@ import ( var _ = check.Suite(&schemaSuite{}) -type schemaSuite struct { -} +type schemaSuite struct{} func (s *schemaSuite) TestAllPhysicalTables(c *check.C) { defer testleak.AfterTest(c)() @@ -146,28 +145,3 @@ func (s *schemaSuite) TestBuildDDLEvent(c *check.C) { }, }) } - -func (s *schemaSuite) TestSinkTableInfos(c *check.C) { - defer testleak.AfterTest(c)() - helper := entry.NewSchemaTestHelper(c) - defer helper.Close() - ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope) - c.Assert(err, check.IsNil) - schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig()) - c.Assert(err, check.IsNil) - // add normal table - job := helper.DDL2Job("create table test.t1(id int primary key)") - tableIDT1 := job.BinlogInfo.TableInfo.ID - c.Assert(schema.HandleDDL(job), check.IsNil) - // add ineligible table - job = helper.DDL2Job("create table test.t2(id int)") - c.Assert(schema.HandleDDL(job), check.IsNil) - c.Assert(schema.SinkTableInfos(), check.DeepEquals, []*model.SimpleTableInfo{ - { - Schema: "test", - Table: "t1", - TableID: tableIDT1, - ColumnInfo: []*model.ColumnInfo{{Name: "id", Type: mysql.TypeLong}}, - }, - }) -} diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 751900afadd..5aab744b6c0 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -36,7 +36,8 @@ type pullerNode struct { } func newPullerNode( - tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node { + tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string, +) pipeline.Node { return &pullerNode{ tableID: tableID, replicaInfo: replicaInfo, diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 75b12a5ba4c..b9c367a08c4 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -95,7 +95,8 @@ func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error { func (s *mockSink) Check(c *check.C, expected []struct { resolvedTs model.Ts row *model.RowChangedEvent -}) { +}, +) { c.Assert(s.received, check.DeepEquals, expected) } diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 296051b37f1..d0c208643a4 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -165,7 +165,8 @@ func NewTablePipeline(ctx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, - targetTs model.Ts) TablePipeline { + targetTs model.Ts, +) TablePipeline { ctx, cancel := cdcContext.WithCancel(ctx) tablePipeline := &tablePipelineImpl{ tableID: tableID, diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index b9fceed79b7..b374b7fe30a 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -35,8 +35,7 @@ import ( pd "github.com/tikv/pd/client" ) -type pullerSuite struct { -} +type pullerSuite struct{} var _ = check.Suite(&pullerSuite{}) diff --git a/cdc/puller/sorter/serde.go b/cdc/puller/sorter/serde.go index 990a2a2f8c6..48d0f348dd5 100644 --- a/cdc/puller/sorter/serde.go +++ b/cdc/puller/sorter/serde.go @@ -18,8 +18,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) -type msgPackGenSerde struct { -} +type msgPackGenSerde struct{} func (m *msgPackGenSerde) marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) { bytes = bytes[:0] diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go index 19dff63c1d0..66973a15c7c 100644 --- a/cdc/puller/sorter/unified_sorter.go +++ b/cdc/puller/sorter/unified_sorter.go @@ -52,8 +52,7 @@ type metricsInfo struct { captureAddr string } -type ctxKey struct { -} +type ctxKey struct{} // UnifiedSorterCheckDir checks whether the directory needed exists and is writable. // If it does not exist, we try to create one. @@ -89,7 +88,8 @@ func NewUnifiedSorter( changeFeedID model.ChangeFeedID, tableName string, tableID model.TableID, - captureAddr string) (*UnifiedSorter, error) { + captureAddr string, +) (*UnifiedSorter, error) { poolMu.Lock() defer poolMu.Unlock() diff --git a/cdc/sink/codec/canal_flat_test.go b/cdc/sink/codec/canal_flat_test.go index 6813a231983..7fb90a06dca 100644 --- a/cdc/sink/codec/canal_flat_test.go +++ b/cdc/sink/codec/canal_flat_test.go @@ -24,8 +24,7 @@ import ( "golang.org/x/text/encoding/charmap" ) -type canalFlatSuite struct { -} +type canalFlatSuite struct{} var _ = check.Suite(&canalFlatSuite{}) diff --git a/cdc/sink/codec/interface_test.go b/cdc/sink/codec/interface_test.go index 21314cc378b..0afbe63065f 100644 --- a/cdc/sink/codec/interface_test.go +++ b/cdc/sink/codec/interface_test.go @@ -22,8 +22,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type codecInterfaceSuite struct { -} +type codecInterfaceSuite struct{} var _ = check.Suite(&codecInterfaceSuite{}) diff --git a/cdc/sink/codec/schema_registry_test.go b/cdc/sink/codec/schema_registry_test.go index 967f43c58a9..99c5fe6b890 100644 --- a/cdc/sink/codec/schema_registry_test.go +++ b/cdc/sink/codec/schema_registry_test.go @@ -30,8 +30,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" ) -type AvroSchemaRegistrySuite struct { -} +type AvroSchemaRegistrySuite struct{} var _ = check.Suite(&AvroSchemaRegistrySuite{}) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 35732438455..6edaa965eb2 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -418,7 +418,8 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, - opts map[string]string, errCh chan error) (*mqSink, error) { + opts map[string]string, errCh chan error, +) (*mqSink, error) { producerConfig := kafka.NewConfig() if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) @@ -442,7 +443,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, } func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, - replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { + replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error, +) (*mqSink, error) { producer, err := pulsar.NewProducer(sinkURI, errCh) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index d8ef81f9f1f..f79c2edf4ca 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -36,7 +36,8 @@ import ( func init() { failpoint.Inject("SimpleMySQLSinkTester", func() { sinkIniterMap["simple-mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newSimpleMySQLSink(ctx, sinkURI, config) } }) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 5fd1366cec7..256e1ea8468 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -65,13 +65,15 @@ type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *filter.Fi func init() { // register blackhole sink sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newBlackHoleSink(ctx), nil } // register mysql sink sinkIniterMap["mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts) } sinkIniterMap["tidb"] = sinkIniterMap["mysql"] @@ -80,27 +82,31 @@ func init() { // register kafka sink sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh) } sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"] // register pulsar sink sinkIniterMap["pulsar"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh) } sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"] // register local sink sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return cdclog.NewLocalFileSink(ctx, sinkURI, errCh) } // register s3 sink sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL, - filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) { + filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error, + ) (Sink, error) { return cdclog.NewS3Sink(ctx, sinkURI, errCh) } } diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 45676e6e815..cd12e1868cf 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -438,8 +438,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if row.Table.IsPartition { partitionID = row.Table.TableID } - row.Table.TableID = - c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) + row.Table.TableID = c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) err = sink.EmitRowChangedEvents(ctx, row) if err != nil { log.Panic("emit row changed event failed", zap.Error(err)) diff --git a/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go b/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go index 04d68db8392..646d6441318 100644 --- a/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go +++ b/pkg/cmd/cli/cli_changefeed_cyclic_create_marktables.go @@ -119,19 +119,18 @@ func (o *cyclicCreateMarktablesOptions) run(cmd *cobra.Command) error { func newCmdCyclicCreateMarktables(f factory.Factory) *cobra.Command { o := newCyclicCreateMarktablesOptions() - command := - &cobra.Command{ - Use: "create-marktables", - Short: "Create cyclic replication mark tables", - RunE: func(cmd *cobra.Command, args []string) error { - err := o.complete(f) - if err != nil { - return err - } - - return o.run(cmd) - }, - } + command := &cobra.Command{ + Use: "create-marktables", + Short: "Create cyclic replication mark tables", + RunE: func(cmd *cobra.Command, args []string) error { + err := o.complete(f) + if err != nil { + return err + } + + return o.run(cmd) + }, + } o.addFlags(command) diff --git a/pkg/cmd/cli/cli_changefeed_helper.go b/pkg/cmd/cli/cli_changefeed_helper.go index 4e63b82c08e..8780a025ab1 100644 --- a/pkg/cmd/cli/cli_changefeed_helper.go +++ b/pkg/cmd/cli/cli_changefeed_helper.go @@ -22,12 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc" - "github.com/pingcap/tiflow/cdc/entry" + captureAPI "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" "github.com/spf13/cobra" @@ -85,33 +84,7 @@ func getTables(cliPdAddr string, credential *security.Credential, cfg *config.Re return nil, nil, err } - meta, err := kv.GetSnapshotMeta(kvStore, startTs) - if err != nil { - return nil, nil, errors.Trace(err) - } - - filter, err := filter.NewFilter(cfg) - if err != nil { - return nil, nil, errors.Trace(err) - } - - snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */) - if err != nil { - return nil, nil, errors.Trace(err) - } - - for _, tableInfo := range snap.Tables() { - if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) { - continue - } - if !tableInfo.IsEligible(false /* forceReplicate */) { - ineligibleTables = append(ineligibleTables, tableInfo.TableName) - } else { - eligibleTables = append(eligibleTables, tableInfo.TableName) - } - } - - return + return captureAPI.VerifyTables(cfg, kvStore, startTs) } // sendOwnerChangefeedQuery sends owner changefeed query request. diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index ac4e0003f2c..9c0642778de 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -220,7 +220,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR close(outCh) log.Info("WatchWithChan exited", zap.String("role", role)) }() - var lastRevision int64 + // get initial revision from opts to avoid revision fall back + lastRevision := getRevisionFromWatchOpts(opts...) watchCtx, cancel := context.WithCancel(ctx) defer func() { // Using closures to handle changes to the cancel function @@ -275,7 +276,7 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR watchCtx, cancel = context.WithCancel(ctx) // to avoid possible context leak warning from govet _ = cancel - watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) // we need to reset lastReceivedResponseTime after reset Watch lastReceivedResponseTime = c.clock.Now() } diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 72a295affb3..57030df3f4e 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -15,6 +15,7 @@ package etcd import ( "context" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -25,8 +26,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) -type clientSuite struct { -} +type clientSuite struct{} var _ = check.Suite(&clientSuite{}) @@ -54,17 +54,23 @@ func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse - resetCount *int - requestCount *int + resetCount *int32 + requestCount *int32 + rev *int64 } func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - *m.resetCount++ + atomic.AddInt32(m.resetCount, 1) + op := &clientv3.Op{} + for _, opt := range opts { + opt(op) + } + atomic.StoreInt64(m.rev, op.Rev()) return m.watchCh } func (m mockWatcher) RequestProgress(ctx context.Context) error { - *m.requestCount++ + atomic.AddInt32(m.requestCount, 1) return nil } @@ -146,10 +152,11 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) { defer testleak.AfterTest(c)() defer s.TearDownTest(c) cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) - watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} cli.Watcher = watcher sentRes := []clientv3.WatchResponse{ @@ -193,9 +200,9 @@ func (s *etcdSuite) TestWatchChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) // make sure watchCh has been reset since timeout - c.Assert(*watcher.resetCount > 1, check.IsTrue) + c.Assert(atomic.LoadInt32(watcher.resetCount) > 1, check.IsTrue) // make sure RequestProgress has been call since timeout - c.Assert(*watcher.requestCount > 1, check.IsTrue) + c.Assert(atomic.LoadInt32(watcher.requestCount) > 1, check.IsTrue) // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration) } @@ -206,10 +213,11 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { defer s.TearDownTest(c) cli := clientv3.NewCtxClient(context.TODO()) - resetCount := 0 - requestCount := 0 + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) watchCh := make(chan clientv3.WatchResponse, 1) - watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} cli.Watcher = watcher mockClock := clock.NewMock() @@ -251,6 +259,52 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) } +func (s *clientSuite) TestRevisionNotFallBack(c *check.C) { + defer testleak.AfterTest(c)() + cli := clientv3.NewCtxClient(context.TODO()) + + resetCount := int32(0) + requestCount := int32(0) + rev := int64(0) + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount, rev: &rev} + cli.Watcher = watcher + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + key := "testRevisionNotFallBack" + outCh := make(chan clientv3.WatchResponse, 1) + // watch from revision = 2 + revision := int64(2) + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + go func() { + watchCli.WatchWithChan(ctx, outCh, key, "test", clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + // wait for WatchWithChan set up + <-outCh + // move time forward + mockClock.Add(time.Second * 30) + // make sure watchCh has been reset since timeout + c.Assert(atomic.LoadInt32(watcher.resetCount) > 1, check.IsTrue) + // make sure revision in WatchWitchChan does not fall back + // even if there has not any response been received from WatchCh + // while WatchCh was reset + c.Assert(atomic.LoadInt64(watcher.rev), check.Equals, revision) +} + type mockTxn struct { ctx context.Context mode int diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go new file mode 100644 index 00000000000..e70ec080edf --- /dev/null +++ b/pkg/etcd/util.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import "go.etcd.io/etcd/clientv3" + +func getRevisionFromWatchOpts(opts ...clientv3.OpOption) int64 { + op := &clientv3.Op{} + for _, opt := range opts { + opt(op) + } + return op.Rev() +} diff --git a/pkg/etcd/util_test.go b/pkg/etcd/util_test.go new file mode 100644 index 00000000000..afc6d0d7e3c --- /dev/null +++ b/pkg/etcd/util_test.go @@ -0,0 +1,35 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package etcd + +import ( + "math" + "math/rand" + + "github.com/pingcap/check" + "github.com/pingcap/tiflow/pkg/util/testleak" + "go.etcd.io/etcd/clientv3" +) + +type utilSuit struct{} + +var _ = check.Suite(&utilSuit{}) + +func (s utilSuit) TestGetRevisionFromWatchOpts(c *check.C) { + defer testleak.AfterTest(c)() + for i := 0; i < 100; i++ { + rev := rand.Int63n(math.MaxInt64) + opt := clientv3.WithRev(rev) + c.Assert(getRevisionFromWatchOpts(opt), check.Equals, rev) + } +} diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index 8408413031a..d2d243c06f1 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -138,6 +138,11 @@ func (s *filterSuite) TestShouldDiscardDDL(c *check.C) { c.Assert(filter.ShouldDiscardDDL(model.ActionDropSchema), check.IsFalse) c.Assert(filter.ShouldDiscardDDL(model.ActionAddForeignKey), check.IsFalse) c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue) + + // Discard sequence DDL. + c.Assert(filter.ShouldDiscardDDL(model.ActionCreateSequence), check.IsTrue) + c.Assert(filter.ShouldDiscardDDL(model.ActionAlterSequence), check.IsTrue) + c.Assert(filter.ShouldDiscardDDL(model.ActionDropSequence), check.IsTrue) } func (s *filterSuite) TestShouldIgnoreDDL(c *check.C) { diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index d51de27bfe3..2364254fa0d 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -48,8 +48,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&etcdWorkerSuite{}) -type etcdWorkerSuite struct { -} +type etcdWorkerSuite struct{} type simpleReactor struct { state *simpleReactorState diff --git a/pkg/orchestrator/util/key_utils_test.go b/pkg/orchestrator/util/key_utils_test.go index 5fc6fbc08dd..d84e3304382 100644 --- a/pkg/orchestrator/util/key_utils_test.go +++ b/pkg/orchestrator/util/key_utils_test.go @@ -24,8 +24,7 @@ func Test(t *testing.T) { check.TestingT(t) } var _ = check.Suite(&keyUtilsSuite{}) -type keyUtilsSuite struct { -} +type keyUtilsSuite struct{} func (s *keyUtilsSuite) TestEtcdKey(c *check.C) { defer testleak.AfterTest(c)() diff --git a/pkg/pipeline/node.go b/pkg/pipeline/node.go index 3f2dd7faa66..a33967c5b1f 100644 --- a/pkg/pipeline/node.go +++ b/pkg/pipeline/node.go @@ -17,7 +17,6 @@ package pipeline // The following functions in this interface will be called in one goroutine. // It's NO NEED to consider concurrency issues type Node interface { - // Init initializes the node // when the pipeline is started, this function will be called in order // you can call `ctx.SendToNextNode(msg)` to send the message to the next node diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 4d868585dba..0a4e8039dd0 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -35,8 +35,7 @@ type pipelineSuite struct{} var _ = check.Suite(&pipelineSuite{}) -type echoNode struct { -} +type echoNode struct{} func (e echoNode) Init(ctx NodeContext) error { ctx.SendToNextNode(PolymorphicEventMessage(&model.PolymorphicEvent{ @@ -435,8 +434,7 @@ func (s *pipelineSuite) TestPipelineAppendNode(c *check.C) { p.Wait() } -type panicNode struct { -} +type panicNode struct{} func (e panicNode) Init(ctx NodeContext) error { panic("panic in panicNode") diff --git a/pkg/scheduler/table_number.go b/pkg/scheduler/table_number.go index 7bfe59b9ace..637eb7aff4d 100644 --- a/pkg/scheduler/table_number.go +++ b/pkg/scheduler/table_number.go @@ -44,7 +44,8 @@ func (t *TableNumberScheduler) Skewness() float64 { // CalRebalanceOperates implements the Scheduler interface func (t *TableNumberScheduler) CalRebalanceOperates(targetSkewness float64) ( - skewness float64, moveTableJobs map[model.TableID]*model.MoveTableJob) { + skewness float64, moveTableJobs map[model.TableID]*model.MoveTableJob, +) { var totalTableNumber uint64 for _, captureWorkloads := range t.workloads { totalTableNumber += uint64(len(captureWorkloads)) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index c415ebf6b17..368f879cd39 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -34,8 +34,7 @@ func Test(t *testing.T) { var _ = check.Suite(&gcManagerSuite{}) -type gcManagerSuite struct { -} +type gcManagerSuite struct{} func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { defer testleak.AfterTest(c)() diff --git a/pkg/workerpool/hash.go b/pkg/workerpool/hash.go index 9af534ca8a5..8c04f74a383 100644 --- a/pkg/workerpool/hash.go +++ b/pkg/workerpool/hash.go @@ -24,8 +24,7 @@ type Hashable interface { HashCode() int64 } -type defaultHasher struct { -} +type defaultHasher struct{} // Hash returns the hash value. func (m *defaultHasher) Hash(object Hashable) int64 { diff --git a/tests/integration_tests/sequence/conf/diff_config.toml b/tests/integration_tests/sequence/conf/diff_config.toml new file mode 100644 index 00000000000..cbb7eee0d11 --- /dev/null +++ b/tests/integration_tests/sequence/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/sequence/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["sequence_test.t1"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/sequence/conf/force_replicate.toml b/tests/integration_tests/sequence/conf/force_replicate.toml new file mode 100644 index 00000000000..dba5fc404ab --- /dev/null +++ b/tests/integration_tests/sequence/conf/force_replicate.toml @@ -0,0 +1 @@ +force-replicate=true diff --git a/tests/integration_tests/sequence/data/test.sql b/tests/integration_tests/sequence/data/test.sql new file mode 100644 index 00000000000..8b98c97f726 --- /dev/null +++ b/tests/integration_tests/sequence/data/test.sql @@ -0,0 +1,25 @@ +drop database if exists `sequence_test`; +create database `sequence_test`; +use `sequence_test`; + +CREATE SEQUENCE seq0 start with 1 minvalue 1 maxvalue 999999999999999 increment by 1 nocache cycle; + +-- select seq0 +SELECT next value for seq0; +-- select again +SELECT next value for seq0; + +-- t1 refers seq0 +-- note that only TiDB supports it. +CREATE TABLE t1 ( + id VARCHAR(255), + a INT default next value for seq0, + PRIMARY KEY(id) +); + +-- TiCDC is able to replicate following changes to TiDB. +INSERT INTO t1 (id) VALUES ('111'); +INSERT INTO t1 (id) VALUES ('222'); +INSERT INTO t1 (id) VALUES ('333'); +UPDATE t1 SET id = '10' WHERE id = '111'; +DELETE FROM t1 WHERE a = 222; diff --git a/tests/integration_tests/sequence/run.sh b/tests/integration_tests/sequence/run.sh new file mode 100755 index 00000000000..702131d6015 --- /dev/null +++ b/tests/integration_tests/sequence/run.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # No need to test kafka. + if [ "$SINK_TYPE" == "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + SINK_URI="mysql://normal:123456@127.0.0.1:3306/" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config $CUR/conf/force_replicate.toml + + run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + + check_table_exists sequence_test.t1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + echo "check table exists success" + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 60 + + # TiCDC discards all SEQUENCE DDL for now. + # See https://github.com/pingcap/tiflow/issues/4559 + ! run_sql "SHOW CREATE SEQUENCE sequence_test.seq0;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "DROP SEQUENCE sequence_test.seq0;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + # Make sure changefeed is normal. + run_sql "CREATE table sequence_test.mark_table(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "sequence_test.mark_table" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"