Skip to content

Commit

Permalink
remove dmlWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Apr 20, 2023
1 parent 423979e commit 980ee71
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 203 deletions.
68 changes: 43 additions & 25 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
Expand Down Expand Up @@ -64,18 +66,19 @@ type eventFragment struct {
// It will send the events to cloud storage systems.
type DMLSink struct {
changefeedID model.ChangeFeedID
// last sequence number
lastSeqNum uint64
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
// encodingWorkers defines a group of workers for encoding events.
encodingWorkers []*encodingWorker
// defragmenter is used to defragment the out-of-order encoded messages.
defragmenter *defragmenter
// writer is a dmlWriter which manages a group of dmlWorkers and
// defragmenter is used to defragment the out-of-order encoded messages and
// sends encoded messages to individual dmlWorkers.
writer *dmlWriter
defragmenter *defragmenter
// workers defines a group of workers for writing events to external storage.
workers []*dmlWorker

statistics *metrics.Statistics
// last sequence number
lastSeqNum uint64

cancel func()
wg sync.WaitGroup
Expand Down Expand Up @@ -125,20 +128,30 @@ func NewDMLSink(ctx context.Context,
s := &DMLSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
msgCh: make(chan eventFragment, defaultChannelSize),
encodingWorkers: make([]*encodingWorker, 0, defaultEncodingConcurrency),
defragmenter: newDefragmenter(wgCtx),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
encodedCh := make(chan eventFragment, defaultChannelSize)
workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
encoder := encoderBuilder.Build()
w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter)
s.encodingWorkers = append(s.encodingWorkers, w)
s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, encodedCh)
}
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
workerChannels[i] = inputCh
}
s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, errCh)
s.defragmenter.outputFn = s.writer.dispatchFragToDMLWorker

s.wg.Add(1)
go func() {
Expand All @@ -159,14 +172,23 @@ func NewDMLSink(ctx context.Context,

func (s *DMLSink) run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
// run dml writer
eg.Go(func() error {
return s.writer.run(ctx)
})

// run the encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
worker := s.encodingWorkers[i]
encodingWorker := s.encodingWorkers[i]
eg.Go(func() error {
return encodingWorker.run(ctx)
})
}

// run the defragmenter.
eg.Go(func() error {
return s.defragmenter.run(ctx)
})

// run dml workers.
for i := 0; i < len(s.workers); i++ {
worker := s.workers[i]
eg.Go(func() error {
return worker.run(ctx)
})
Expand Down Expand Up @@ -212,16 +234,12 @@ func (s *DMLSink) Close() {
}
s.wg.Wait()

if s.defragmenter != nil {
s.defragmenter.close()
}

for _, w := range s.encodingWorkers {
w.close()
for _, encodingWorker := range s.encodingWorkers {
encodingWorker.close()
}

if s.writer != nil {
s.writer.close()
for _, worker := range s.workers {
worker.close()
}

if s.statistics != nil {
Expand Down
21 changes: 17 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ import (
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
}
}

func generateTxnEvents(
cnt *uint64,
batch int,
Expand Down Expand Up @@ -89,6 +95,8 @@ func generateTxnEvents(
}

func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
Expand Down Expand Up @@ -159,6 +167,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
}

func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
parentDir := t.TempDir()
uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir)
Expand All @@ -173,7 +183,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
s.writer.setClock(mockClock)
setClock(s, mockClock)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -205,7 +215,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {

// test date (day) is NOT changed.
mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand All @@ -229,7 +240,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {

// test date (day) is changed.
mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand Down Expand Up @@ -261,7 +273,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
require.Nil(t, err)
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
s.writer.setClock(mockClock)
setClock(s, mockClock)

err = s.WriteEvents(txns...)
require.Nil(t, err)
time.Sleep(3 * time.Second)
Expand Down
60 changes: 34 additions & 26 deletions cdc/sink/dmlsink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,44 @@ package cloudstorage

import (
"context"
"sync"

"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
)

// defragmenter is used to handle event fragments which can be registered
// out of order.
type defragmenter struct {
lastWritten uint64
future map[uint64]eventFragment
wg sync.WaitGroup
inputCh chan eventFragment

outputFn func(eventFragment)
inputCh <-chan eventFragment
outputChs []*chann.DrainableChann[eventFragment]
hasher *hash.PositionInertia
}

func newDefragmenter(ctx context.Context) *defragmenter {
d := &defragmenter{
future: make(map[uint64]eventFragment),
inputCh: make(chan eventFragment, defaultChannelSize),
func newDefragmenter(
inputCh <-chan eventFragment,
outputChs []*chann.DrainableChann[eventFragment],
) *defragmenter {
return &defragmenter{
future: make(map[uint64]eventFragment),
inputCh: inputCh,
outputChs: outputChs,
hasher: hash.NewPositionInertia(),
}
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.defragMsgs(ctx)
}()
return d
}

func (d *defragmenter) registerFrag(frag eventFragment) {
d.inputCh <- frag
}

func (d *defragmenter) defragMsgs(ctx context.Context) {
func (d *defragmenter) run(ctx context.Context) error {
defer d.close()
for {
select {
case <-ctx.Done():
d.future = nil
return
return errors.Trace(ctx.Err())
case frag, ok := <-d.inputCh:
if !ok {
return
return nil
}
// check whether to write messages to output channel right now
next := d.lastWritten + 1
Expand All @@ -62,7 +60,7 @@ func (d *defragmenter) defragMsgs(ctx context.Context) {
} else if frag.seqNumber > next {
d.future[frag.seqNumber] = frag
} else {
return
return nil
}
}
}
Expand All @@ -72,7 +70,7 @@ func (d *defragmenter) writeMsgsConsecutive(
ctx context.Context,
start eventFragment,
) {
d.outputFn(start)
d.dispatchFragToDMLWorker(start)

d.lastWritten++
for {
Expand All @@ -85,14 +83,24 @@ func (d *defragmenter) writeMsgsConsecutive(
next := d.lastWritten + 1
if frag, ok := d.future[next]; ok {
delete(d.future, next)
d.outputFn(frag)
d.dispatchFragToDMLWorker(frag)
d.lastWritten = next
} else {
return
}
}
}

func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) {
tableName := frag.versionedTable.TableNameWithPhysicTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(len(d.outputChs))
d.outputChs[workerID].In() <- frag
}

func (d *defragmenter) close() {
d.wg.Wait()
for _, ch := range d.outputChs {
ch.CloseAndDrain()
}
}
25 changes: 16 additions & 9 deletions cdc/sink/dmlsink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,27 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestDeframenter(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
outputCh := make(chan eventFragment, 1024)
outputFn := func(frag eventFragment) {
outputCh <- frag
}
defrag := newDefragmenter(ctx)
defrag.outputFn = outputFn
eg, egCtx := errgroup.WithContext(ctx)

inputCh := make(chan eventFragment)
outputCh := chann.NewAutoDrainChann[eventFragment]()
defrag := newDefragmenter(inputCh, []*chann.DrainableChann[eventFragment]{outputCh})
eg.Go(func() error {
return defrag.run(egCtx)
})

uri := "file:///tmp/test"
txnCnt := 50
sinkURI, err := url.Parse(uri)
Expand Down Expand Up @@ -113,15 +120,15 @@ func TestDeframenter(t *testing.T) {
for _, msg := range frag.encodedMsgs {
msg.Key = []byte(strconv.Itoa(int(seq)))
}
defrag.registerFrag(frag)
inputCh <- frag
}(uint64(i + 1))
}

prevSeq := 0
LOOP:
for {
select {
case frag := <-outputCh:
case frag := <-outputCh.Out():
for _, msg := range frag.encodedMsgs {
curSeq, err := strconv.Atoi(string(msg.Key))
require.Nil(t, err)
Expand All @@ -133,5 +140,5 @@ LOOP:
}
}
cancel()
defrag.close()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}
Loading

0 comments on commit 980ee71

Please sign in to comment.