Skip to content

Commit

Permalink
etcd/client (ticdc): Prevent revision in WatchWitchChan fallback. (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 14, 2022
1 parent ee1ad70 commit 9820578
Show file tree
Hide file tree
Showing 37 changed files with 224 additions and 125 deletions.
6 changes: 2 additions & 4 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (

func Test(t *testing.T) { check.TestingT(t) }

type codecSuite struct {
}
type codecSuite struct{}

var _ = check.Suite(&codecSuite{})

Expand All @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) {
c.Assert(len(key), check.Equals, 0)
}

type decodeMetaKeySuite struct {
}
type decodeMetaKeySuite struct{}

var _ = check.Suite(&decodeMetaKeySuite{})

Expand Down
3 changes: 2 additions & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ func testMounterDisableOldValue(c *check.C, tc struct {
tableName string
createTableDDL string
values [][]interface{}
}) {
},
) {
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)
defer store.Close() //nolint:errcheck
Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func Test(t *testing.T) {
check.TestingT(t)
}

type clientSuite struct {
}
type clientSuite struct{}

var _ = check.Suite(&clientSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type rtsHeapSuite struct {
}
type rtsHeapSuite struct{}

var _ = check.Suite(&rtsHeapSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/kv/token_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
"golang.org/x/sync/errgroup"
)

type tokenRegionSuite struct {
}
type tokenRegionSuite struct{}

var _ = check.Suite(&tokenRegionSuite{})

Expand Down
1 change: 0 additions & 1 deletion cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ var _ = check.Suite(&taskStatusSuite{})
func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) {
defer testleak.AfterTest(c)()
info := TaskStatus{

Tables: map[TableID]*TableReplicaInfo{
1: {StartTs: 100},
2: {StartTs: 100},
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&barrierSuite{})

type barrierSuite struct {
}
type barrierSuite struct{}

func (s *barrierSuite) TestBarrier(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {

var _ = check.Suite(&changefeedSuite{})

type changefeedSuite struct {
}
type changefeedSuite struct{}

func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) {
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
ctx.GlobalVars().PDClient = &gc.MockPDClient{
UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (

var _ = check.Suite(&ddlPullerSuite{})

type ddlPullerSuite struct {
}
type ddlPullerSuite struct{}

type mockPuller struct {
c *check.C
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

var _ = check.Suite(&feedStateManagerSuite{})

type feedStateManagerSuite struct {
}
type feedStateManagerSuite struct{}

func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
59 changes: 27 additions & 32 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (

var _ = check.Suite(&ownerSuite{})

type ownerSuite struct {
}
type ownerSuite struct{}

type mockManager struct {
gc.Manager
Expand Down Expand Up @@ -328,22 +327,20 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
tester := orchestrator.NewReactorStateTester(c, state, nil)

// no changefeed, the gc safe point should be max uint64
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1))
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(math.MaxUint64-1))
return 0, nil
}
err := o.updateGCSafepoint(ctx, state)
c.Assert(err, check.IsNil)

// add a failed changefeed, it must not trigger update GC safepoint.
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Fatal("must not update")
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
c.Fatal("must not update")
return 0, nil
}
changefeedID1 := "changefeed-test1"
tester.MustUpdate(
fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1),
Expand All @@ -360,15 +357,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
// switch the state of changefeed to normal, it must update GC safepoint to
// 1 (checkpoint Ts of changefeed-test1).
ch := make(chan struct{}, 1)
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(1))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(1))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
state.Changefeeds[changefeedID1].PatchInfo(
func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.State = model.StateNormal
Expand Down Expand Up @@ -398,15 +394,14 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) {
return &model.ChangeFeedStatus{CheckpointTs: 30}, true, nil
})
tester.MustApplyPatches()
mockPDClient.UpdateServiceGCSafePointFunc =
func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(19))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
mockPDClient.UpdateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
c.Assert(safePoint, check.Equals, uint64(19))
c.Assert(serviceID, check.Equals, gc.CDCServiceSafePointID)
ch <- struct{}{}
return 0, nil
}
err = o.updateGCSafepoint(ctx, state)
c.Assert(err, check.IsNil)
select {
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import (

var _ = check.Suite(&schemaSuite{})

type schemaSuite struct {
}
type schemaSuite struct{}

func (s *schemaSuite) TestAllPhysicalTables(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type pullerNode struct {
}

func newPullerNode(
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string) pipeline.Node {
tableID model.TableID, replicaInfo *model.TableReplicaInfo, tableName string,
) pipeline.Node {
return &pullerNode{
tableID: tableID,
replicaInfo: replicaInfo,
Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (s *mockSink) Check(c *check.C, expected []struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}) {
},
) {
c.Assert(s.received, check.DeepEquals, expected)
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func NewTablePipeline(ctx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
targetTs model.Ts) TablePipeline {
targetTs model.Ts,
) TablePipeline {
ctx, cancel := cdcContext.WithCancel(ctx)
tablePipeline := &tablePipelineImpl{
tableID: tableID,
Expand Down
3 changes: 1 addition & 2 deletions cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import (
pd "github.com/tikv/pd/client"
)

type pullerSuite struct {
}
type pullerSuite struct{}

var _ = check.Suite(&pullerSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/puller/sorter/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
)

type msgPackGenSerde struct {
}
type msgPackGenSerde struct{}

func (m *msgPackGenSerde) marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) {
bytes = bytes[:0]
Expand Down
6 changes: 3 additions & 3 deletions cdc/puller/sorter/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type metricsInfo struct {
captureAddr string
}

type ctxKey struct {
}
type ctxKey struct{}

// UnifiedSorterCheckDir checks whether the directory needed exists and is writable.
// If it does not exist, we try to create one.
Expand Down Expand Up @@ -89,7 +88,8 @@ func NewUnifiedSorter(
changeFeedID model.ChangeFeedID,
tableName string,
tableID model.TableID,
captureAddr string) (*UnifiedSorter, error) {
captureAddr string,
) (*UnifiedSorter, error) {
poolMu.Lock()
defer poolMu.Unlock()

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"golang.org/x/text/encoding/charmap"
)

type canalFlatSuite struct {
}
type canalFlatSuite struct{}

var _ = check.Suite(&canalFlatSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type codecInterfaceSuite struct {
}
type codecInterfaceSuite struct{}

var _ = check.Suite(&codecInterfaceSuite{})

Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/codec/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import (
"github.com/pingcap/tiflow/pkg/util/testleak"
)

type AvroSchemaRegistrySuite struct {
}
type AvroSchemaRegistrySuite struct{}

var _ = check.Suite(&AvroSchemaRegistrySuite{})

Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage,

func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
filter *filter.Filter, replicaConfig *config.ReplicaConfig,
opts map[string]string, errCh chan error) (*mqSink, error) {
opts map[string]string, errCh chan error,
) (*mqSink, error) {
producerConfig := kafka.NewConfig()
if err := kafka.CompleteConfigsAndOpts(sinkURI, producerConfig, replicaConfig, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
Expand All @@ -442,7 +443,8 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}

func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) {
replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (*mqSink, error) {
producer, err := pulsar.NewProducer(sinkURI, errCh)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
func init() {
failpoint.Inject("SimpleMySQLSinkTester", func() {
sinkIniterMap["simple-mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newSimpleMySQLSink(ctx, sinkURI, config)
}
})
Expand Down
18 changes: 12 additions & 6 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *filter.Fi
func init() {
// register blackhole sink
sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newBlackHoleSink(ctx), nil
}

// register mysql sink
sinkIniterMap["mysql"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newMySQLSink(ctx, changefeedID, sinkURI, filter, config, opts)
}
sinkIniterMap["tidb"] = sinkIniterMap["mysql"]
Expand All @@ -80,27 +82,31 @@ func init() {

// register kafka sink
sinkIniterMap["kafka"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newKafkaSaramaSink(ctx, sinkURI, filter, config, opts, errCh)
}
sinkIniterMap["kafka+ssl"] = sinkIniterMap["kafka"]

// register pulsar sink
sinkIniterMap["pulsar"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return newPulsarSink(ctx, sinkURI, filter, config, opts, errCh)
}
sinkIniterMap["pulsar+ssl"] = sinkIniterMap["pulsar"]

// register local sink
sinkIniterMap["local"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return cdclog.NewLocalFileSink(ctx, sinkURI, errCh)
}

// register s3 sink
sinkIniterMap["s3"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
return cdclog.NewS3Sink(ctx, sinkURI, errCh)
}
}
Expand Down
Loading

0 comments on commit 9820578

Please sign in to comment.