Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): remove redundant channel and dmlWriter in storage sink #8783

Merged
merged 3 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
Expand Down Expand Up @@ -64,18 +66,19 @@ type eventFragment struct {
// It will send the events to cloud storage systems.
type DMLSink struct {
changefeedID model.ChangeFeedID
// last sequence number
lastSeqNum uint64
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
// encodingWorkers defines a group of workers for encoding events.
encodingWorkers []*encodingWorker
// defragmenter is used to defragment the out-of-order encoded messages.
defragmenter *defragmenter
// writer is a dmlWriter which manages a group of dmlWorkers and
// defragmenter is used to defragment the out-of-order encoded messages and
// sends encoded messages to individual dmlWorkers.
writer *dmlWriter
defragmenter *defragmenter
// workers defines a group of workers for writing events to external storage.
workers []*dmlWorker

statistics *metrics.Statistics
// last sequence number
lastSeqNum uint64

cancel func()
wg sync.WaitGroup
Expand Down Expand Up @@ -125,20 +128,30 @@ func NewDMLSink(ctx context.Context,
s := &DMLSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
msgCh: make(chan eventFragment, defaultChannelSize),
encodingWorkers: make([]*encodingWorker, 0, defaultEncodingConcurrency),
defragmenter: newDefragmenter(wgCtx),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter)
s.encodingWorkers = append(s.encodingWorkers, w)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
workerChannels[i] = inputCh
}
orderedCh := s.defragmenter.orderedOut()
s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh)

s.wg.Add(1)
go func() {
Expand All @@ -159,14 +172,23 @@ func NewDMLSink(ctx context.Context,

func (s *DMLSink) run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
// run dml writer
eg.Go(func() error {
return s.writer.run(ctx)
})

// run the encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
worker := s.encodingWorkers[i]
encodingWorker := s.encodingWorkers[i]
eg.Go(func() error {
return encodingWorker.run(ctx)
})
}

// run the defragmenter.
eg.Go(func() error {
return s.defragmenter.run(ctx)
})

// run dml workers.
for i := 0; i < len(s.workers); i++ {
worker := s.workers[i]
eg.Go(func() error {
return worker.run(ctx)
})
Expand Down Expand Up @@ -212,16 +234,12 @@ func (s *DMLSink) Close() {
}
s.wg.Wait()

if s.defragmenter != nil {
s.defragmenter.close()
}

for _, w := range s.encodingWorkers {
w.close()
for _, encodingWorker := range s.encodingWorkers {
encodingWorker.close()
}

if s.writer != nil {
s.writer.close()
for _, worker := range s.workers {
worker.close()
}

if s.statistics != nil {
Expand Down
21 changes: 17 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import (
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
}
}

func generateTxnEvents(
cnt *uint64,
batch int,
Expand Down Expand Up @@ -89,6 +95,8 @@ func generateTxnEvents(
}

func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
Expand Down Expand Up @@ -159,6 +167,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
}

func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
Expand All @@ -173,7 +183,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
s.writer.setClock(mockClock)
setClock(s, mockClock)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -205,7 +215,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {

// test date (day) is NOT changed.
mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand All @@ -229,7 +240,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {

// test date (day) is changed.
mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -261,7 +273,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand Down
64 changes: 34 additions & 30 deletions cdc/sink/dmlsink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,44 @@ package cloudstorage

import (
"context"
"sync"

"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
)

// defragmenter is used to handle event fragments which can be registered
// out of order.
type defragmenter struct {
lastWritten uint64
future map[uint64]eventFragment
wg sync.WaitGroup
inputCh chan eventFragment
outputCh chan eventFragment
inputCh <-chan eventFragment
outputChs []*chann.DrainableChann[eventFragment]
hasher *hash.PositionInertia
}

func newDefragmenter(ctx context.Context) *defragmenter {
d := &defragmenter{
future: make(map[uint64]eventFragment),
inputCh: make(chan eventFragment, defaultChannelSize),
outputCh: make(chan eventFragment, defaultChannelSize),
func newDefragmenter(
inputCh <-chan eventFragment,
outputChs []*chann.DrainableChann[eventFragment],
) *defragmenter {
return &defragmenter{
future: make(map[uint64]eventFragment),
inputCh: inputCh,
outputChs: outputChs,
hasher: hash.NewPositionInertia(),
}
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.defragMsgs(ctx)
}()
return d
}

func (d *defragmenter) registerFrag(frag eventFragment) {
d.inputCh <- frag
}

func (d *defragmenter) orderedOut() <-chan eventFragment {
return d.outputCh
}

func (d *defragmenter) defragMsgs(ctx context.Context) {
func (d *defragmenter) run(ctx context.Context) error {
defer d.close()
for {
select {
case <-ctx.Done():
d.future = nil
return
return errors.Trace(ctx.Err())
case frag, ok := <-d.inputCh:
if !ok {
return
return nil
}
// check whether to write messages to output channel right now
next := d.lastWritten + 1
Expand All @@ -66,7 +60,7 @@ func (d *defragmenter) defragMsgs(ctx context.Context) {
} else if frag.seqNumber > next {
d.future[frag.seqNumber] = frag
} else {
return
return nil
}
}
}
Expand All @@ -76,7 +70,7 @@ func (d *defragmenter) writeMsgsConsecutive(
ctx context.Context,
start eventFragment,
) {
d.outputCh <- start
d.dispatchFragToDMLWorker(start)

d.lastWritten++
for {
Expand All @@ -89,14 +83,24 @@ func (d *defragmenter) writeMsgsConsecutive(
next := d.lastWritten + 1
if frag, ok := d.future[next]; ok {
delete(d.future, next)
d.outputCh <- frag
d.dispatchFragToDMLWorker(frag)
d.lastWritten = next
} else {
return
}
}
}

func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) {
tableName := frag.versionedTable.TableNameWithPhysicTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
d.outputChs[workerID].In() <- frag
}

func (d *defragmenter) close() {
d.wg.Wait()
for _, ch := range d.outputChs {
ch.CloseAndDrain()
}
}
20 changes: 16 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,27 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestDeframenter(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defrag := newDefragmenter(ctx)
eg, egCtx := errgroup.WithContext(ctx)

inputCh := make(chan eventFragment)
outputCh := chann.NewAutoDrainChann[eventFragment]()
defrag := newDefragmenter(inputCh, []*chann.DrainableChann[eventFragment]{outputCh})
eg.Go(func() error {
return defrag.run(egCtx)
})

uri := "file:///tmp/test"
txnCnt := 50
sinkURI, err := url.Parse(uri)
Expand Down Expand Up @@ -108,15 +120,15 @@ func TestDeframenter(t *testing.T) {
for _, msg := range frag.encodedMsgs {
msg.Key = []byte(strconv.Itoa(int(seq)))
}
defrag.registerFrag(frag)
inputCh <- frag
}(uint64(i + 1))
}

prevSeq := 0
LOOP:
for {
select {
case frag := <-defrag.orderedOut():
case frag := <-outputCh.Out():
for _, msg := range frag.encodedMsgs {
curSeq, err := strconv.Atoi(string(msg.Key))
require.Nil(t, err)
Expand All @@ -128,5 +140,5 @@ LOOP:
}
}
cancel()
defrag.close()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}
Loading