Skip to content

Commit

Permalink
sink(ticdc): cherry pick sink bug fix to release 5.2 (#4083) (#4119)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
1 parent f4048d8 commit 2ed77d8
Show file tree
Hide file tree
Showing 23 changed files with 984 additions and 727 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) {
}

type sinkNode struct {
sink sink.Sink
status TableStatus
sink sink.Sink
status TableStatus
tableID model.TableID

resolvedTs model.Ts
checkpointTs model.Ts
Expand All @@ -78,8 +79,9 @@ type sinkNode struct {
flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
Expand Down Expand Up @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
Expand All @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewTablePipeline(ctx cdcContext.Context,
if cyclicEnabled {
p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID))
}
tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController)
tablePipeline.sinkNode = newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)
p.AppendNode(ctx, "sink", tablePipeline.sinkNode)
tablePipeline.p = p
return tablePipeline
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if err := p.lazyInit(ctx); err != nil {
return nil, errors.Trace(err)
}
// sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed
p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status))
if err := p.handleTableOperation(ctx); err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand All @@ -47,6 +48,7 @@ func newProcessor4Test(
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.sinkManager = &sink.Manager{}
p.createTablePipeline = createTablePipeline
p.schemaStorage = &mockSchemaStorage{c: c}
return p
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSin

type blackHoleSink struct {
statistics *Statistics
checkpointTs uint64
accumulated uint64
lastAccumulated uint64
}
Expand All @@ -46,7 +45,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
return nil
}

func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand All @@ -56,7 +55,6 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui
return int(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.checkpointTs, resolvedTs)
return resolvedTs, err
}

Expand All @@ -79,6 +77,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
91 changes: 91 additions & 0 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableIsNotFlushed(t *testing.T) {
b := bufferSink{changeFeedCheckpointTs: 1}
require.Equal(t, uint64(1), b.getTableCheckpointTs(2))
b.UpdateChangeFeedCheckpointTs(3)
require.Equal(t, uint64(3), b.getTableCheckpointTs(2))
}

func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
tbl2 := &model.TableName{TableID: 2}
tbl3 := &model.TableName{TableID: 3}
tbl4 := &model.TableName{TableID: 4}
require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{
{CommitTs: 6, Table: tbl1},
{CommitTs: 6, Table: tbl2},
{CommitTs: 6, Table: tbl3},
{CommitTs: 6, Table: tbl4},
{CommitTs: 10, Table: tbl1},
{CommitTs: 10, Table: tbl2},
{CommitTs: 10, Table: tbl3},
{CommitTs: 10, Table: tbl4},
}...))
checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7)
require.True(t, checkpoint <= 7)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6)
require.True(t, checkpoint <= 6)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(4))
b.UpdateChangeFeedCheckpointTs(6)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(6), b.getTableCheckpointTs(4))
}

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
cancel()
checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18)
require.Equal(t, uint64(8), checkpoint)
checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18)
require.Equal(t, uint64(5), checkpoint)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC
return f.emitRowChangedEvents(ctx, newTableStream, rows...)
}

func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs))
return f.flushRowChangedEvents(ctx, resolvedTs)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error {
return nil
}

func (f *fileSink) Barrier(ctx context.Context) error {
func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in file sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data))
}

func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
// we should flush all events before resolvedTs, there are two kind of flush policy
// 1. flush row events to a s3 chunk: if the event size is not enough,
// TODO: when cdc crashed, we should repair these chunks to a complete file
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error {
return nil
}

func (s *s3Sink) Barrier(ctx context.Context) error {
func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
33 changes: 12 additions & 21 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"sort"
"sync"
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -55,7 +54,6 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
type UnresolvedTxnCache struct {
unresolvedTxnsMu sync.Mutex
unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs
checkpointTs uint64
}

// NewUnresolvedTxnCache returns a new UnresolvedTxnCache
Expand Down Expand Up @@ -103,32 +101,27 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha

// Resolved returns resolved txns according to resolvedTs
// The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing
func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn {
if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) {
return nil
}

func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
if len(c.unresolvedTxns) == 0 {
return nil
return nil, nil
}

_, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns)
return resolvedTxnsMap
}

// UpdateCheckpoint updates the checkpoint ts
func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) {
atomic.StoreUint64(&c.checkpointTs, checkpointTs)
return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns)
}

func splitResolvedTxn(
resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
minTs = resolvedTs
flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
for tableID, txns := range unresolvedTxns {
v, ok := resolvedTsMap.Load(tableID)
if !ok {
continue
}
resolvedTs := v.(uint64)
i := sort.Search(len(txns), func(i int) bool {
return txns[i].commitTs > resolvedTs
})
Expand All @@ -154,9 +147,7 @@ func splitResolvedTxn(
}
}
resolvedRowsMap[tableID] = resolvedTxns
if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs {
minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs
}
flushedResolvedTsMap[tableID] = resolvedTs
}
return
}
Loading

0 comments on commit 2ed77d8

Please sign in to comment.