Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4192-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Jan 13, 2022
2 parents 2a2111c + 2a587ec commit 5834426
Show file tree
Hide file tree
Showing 18 changed files with 332 additions and 187 deletions.
3 changes: 2 additions & 1 deletion cdc/capture/http_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
cerror.ErrMySQLInvalidConfig,
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist, cerror.ErrTaskStatusNotExists,
cerror.ErrTaskPositionNotExists,
}

// IsHTTPBadRequestError check if a error is a http bad request error
Expand Down
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ type asyncSinkSuite struct {

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -88,17 +82,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,21 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err != nil {
return errors.Trace(err)
}

// we must call flowController.Release immediately after we call
// FlushRowChangedEvents to prevent deadlock cause by checkpointTs
// fall back
n.flowController.Release(checkpointTs)

// the checkpointTs may fall back in some situation such as:
// 1. This table is newly added to the processor
// 2. There is one table in the processor that has a smaller
// checkpointTs than this one
if checkpointTs <= n.checkpointTs {
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down
62 changes: 58 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down Expand Up @@ -559,3 +555,61 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
}

type flushFlowController struct {
mockFlowController
releaseCounter int
}

func (c *flushFlowController) Release(resolvedTs uint64) {
c.releaseCounter++
}

type flushSink struct {
mockSink
}

// use to simulate the situation that resolvedTs return from sink manager
// fall back
var fallBackResolvedTs = uint64(10)

func (s *flushSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs == fallBackResolvedTs {
return 0, nil
}
return resolvedTs, nil
}

// TestFlushSinkReleaseFlowController tests sinkNode.flushSink method will always
// call flowController.Release to release the memory quota of the table to avoid
// deadlock if there is no error occur
func (s *outputSuite) TestFlushSinkReleaseFlowController(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
cfg := config.GetDefaultReplicaConfig()
cfg.EnableOldValue = false
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test-flushSink",
Info: &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
Config: cfg,
},
})
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
c.Assert(sNode.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
sNode.barrierTs = 10

cctx := pipeline.MockNodeContext4Test(nil, pipeline.TickMessage(), nil)
err := sNode.flushSink(cctx, uint64(8))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 1)
// resolvedTs will fall back in this call
err = sNode.flushSink(cctx, uint64(10))
c.Assert(err, check.IsNil)
c.Assert(sNode.checkpointTs, check.Equals, uint64(8))
c.Assert(flowController.releaseCounter, check.Equals, 2)
}
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
5 changes: 1 addition & 4 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {
return atomic.LoadUint64(&m.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (m *Manager) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&m.changeFeedCheckpointTs, checkpointTs)
if m.backendSink != nil {
Expand Down Expand Up @@ -235,10 +236,6 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
return ckpt, err
}

func (t *tableSink) getEmittedTs() uint64 {
return atomic.LoadUint64(&t.emittedTs)
}

func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
// the table sink doesn't receive the checkpoint event
return nil
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func newCheckSink(c *check.C) *checkSink {
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -342,10 +338,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
16 changes: 7 additions & 9 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down Expand Up @@ -414,7 +408,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -444,8 +442,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -161,6 +167,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
Loading

0 comments on commit 5834426

Please sign in to comment.