From 423979e1573ae010ec2698fb0b4b36f2d257c355 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 3 Apr 2023 20:04:09 +0800 Subject: [PATCH 1/2] remove redundant channel --- .../cloudstorage/cloud_storage_dml_sink.go | 4 +-- cdc/sink/dmlsink/cloudstorage/defragmenter.go | 16 ++++------ .../dmlsink/cloudstorage/defragmenter_test.go | 7 ++++- cdc/sink/dmlsink/cloudstorage/dml_writer.go | 29 ++++--------------- 4 files changed, 20 insertions(+), 36 deletions(-) diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 3b76cfa6f6a..07b9da4053b 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -137,8 +137,8 @@ func NewDMLSink(ctx context.Context, w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter) s.encodingWorkers = append(s.encodingWorkers, w) } - orderedCh := s.defragmenter.orderedOut() - s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh) + s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, errCh) + s.defragmenter.outputFn = s.writer.dispatchFragToDMLWorker s.wg.Add(1) go func() { diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter.go b/cdc/sink/dmlsink/cloudstorage/defragmenter.go index 7d898037a3d..4ae28559320 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter.go @@ -24,14 +24,14 @@ type defragmenter struct { future map[uint64]eventFragment wg sync.WaitGroup inputCh chan eventFragment - outputCh chan eventFragment + + outputFn func(eventFragment) } func newDefragmenter(ctx context.Context) *defragmenter { d := &defragmenter{ - future: make(map[uint64]eventFragment), - inputCh: make(chan eventFragment, defaultChannelSize), - outputCh: make(chan eventFragment, defaultChannelSize), + future: make(map[uint64]eventFragment), + inputCh: make(chan eventFragment, defaultChannelSize), } d.wg.Add(1) go func() { @@ -45,10 +45,6 @@ func (d *defragmenter) registerFrag(frag eventFragment) { d.inputCh <- frag } -func (d *defragmenter) orderedOut() <-chan eventFragment { - return d.outputCh -} - func (d *defragmenter) defragMsgs(ctx context.Context) { for { select { @@ -76,7 +72,7 @@ func (d *defragmenter) writeMsgsConsecutive( ctx context.Context, start eventFragment, ) { - d.outputCh <- start + d.outputFn(start) d.lastWritten++ for { @@ -89,7 +85,7 @@ func (d *defragmenter) writeMsgsConsecutive( next := d.lastWritten + 1 if frag, ok := d.future[next]; ok { delete(d.future, next) - d.outputCh <- frag + d.outputFn(frag) d.lastWritten = next } else { return diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go index 4be79933f2e..5c59384836c 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go @@ -34,7 +34,12 @@ import ( func TestDeframenter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) + outputCh := make(chan eventFragment, 1024) + outputFn := func(frag eventFragment) { + outputCh <- frag + } defrag := newDefragmenter(ctx) + defrag.outputFn = outputFn uri := "file:///tmp/test" txnCnt := 50 sinkURI, err := url.Parse(uri) @@ -116,7 +121,7 @@ func TestDeframenter(t *testing.T) { LOOP: for { select { - case frag := <-defrag.orderedOut(): + case frag := <-outputCh: for _, msg := range frag.encodedMsgs { curSeq, err := strconv.Atoi(string(msg.Key)) require.Nil(t, err) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_writer.go b/cdc/sink/dmlsink/cloudstorage/dml_writer.go index e079bc773c9..8a9a4580eef 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_writer.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_writer.go @@ -15,7 +15,6 @@ package cloudstorage import ( "context" - "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" @@ -38,7 +37,6 @@ type dmlWriter struct { extension string clock clock.Clock statistics *metrics.Statistics - inputCh <-chan eventFragment errCh chan<- error } @@ -48,7 +46,6 @@ func newDMLWriter( config *cloudstorage.Config, extension string, statistics *metrics.Statistics, - inputCh <-chan eventFragment, errCh chan<- error, ) *dmlWriter { d := &dmlWriter{ @@ -61,7 +58,6 @@ func newDMLWriter( config: config, extension: extension, statistics: statistics, - inputCh: inputCh, errCh: errCh, } @@ -83,9 +79,6 @@ func (d *dmlWriter) setClock(clock clock.Clock) { func (d *dmlWriter) run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - return d.dispatchFragToDMLWorker(ctx) - }) for i := 0; i < d.config.WorkerCount; i++ { worker := d.workers[i] @@ -98,22 +91,12 @@ func (d *dmlWriter) run(ctx context.Context) error { return eg.Wait() } -func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case frag, ok := <-d.inputCh: - if !ok { - return nil - } - tableName := frag.versionedTable.TableNameWithPhysicTableID - d.hasher.Reset() - d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) - workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount) - d.workerChannels[workerID].In() <- frag - } - } +func (d *dmlWriter) dispatchFragToDMLWorker(frag eventFragment) { + tableName := frag.versionedTable.TableNameWithPhysicTableID + d.hasher.Reset() + d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) + workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount) + d.workerChannels[workerID].In() <- frag } func (d *dmlWriter) close() { From 980ee71d4cc1ff9b0c15015f2dece7be2855287b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Thu, 20 Apr 2023 21:10:39 +0800 Subject: [PATCH 2/2] remove dmlWriter --- .../cloudstorage/cloud_storage_dml_sink.go | 68 +++++++---- .../cloud_storage_dml_sink_test.go | 21 +++- cdc/sink/dmlsink/cloudstorage/defragmenter.go | 60 +++++----- .../dmlsink/cloudstorage/defragmenter_test.go | 25 ++-- cdc/sink/dmlsink/cloudstorage/dml_worker.go | 12 +- .../dmlsink/cloudstorage/dml_worker_test.go | 8 +- cdc/sink/dmlsink/cloudstorage/dml_writer.go | 109 ------------------ .../dmlsink/cloudstorage/encoding_worker.go | 12 +- .../cloudstorage/encoding_worker_test.go | 46 +++++--- 9 files changed, 158 insertions(+), 203 deletions(-) delete mode 100644 cdc/sink/dmlsink/cloudstorage/dml_writer.go diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 07b9da4053b..507efa879f1 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/engine/pkg/clock" + "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" @@ -64,18 +66,19 @@ type eventFragment struct { // It will send the events to cloud storage systems. type DMLSink struct { changefeedID model.ChangeFeedID + // last sequence number + lastSeqNum uint64 // msgCh is a channel to hold eventFragment. msgCh chan eventFragment // encodingWorkers defines a group of workers for encoding events. encodingWorkers []*encodingWorker - // defragmenter is used to defragment the out-of-order encoded messages. - defragmenter *defragmenter - // writer is a dmlWriter which manages a group of dmlWorkers and + // defragmenter is used to defragment the out-of-order encoded messages and // sends encoded messages to individual dmlWorkers. - writer *dmlWriter + defragmenter *defragmenter + // workers defines a group of workers for writing events to external storage. + workers []*dmlWorker + statistics *metrics.Statistics - // last sequence number - lastSeqNum uint64 cancel func() wg sync.WaitGroup @@ -125,20 +128,30 @@ func NewDMLSink(ctx context.Context, s := &DMLSink{ changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), msgCh: make(chan eventFragment, defaultChannelSize), - encodingWorkers: make([]*encodingWorker, 0, defaultEncodingConcurrency), - defragmenter: newDefragmenter(wgCtx), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), cancel: wgCancel, dead: make(chan struct{}), } + encodedCh := make(chan eventFragment, defaultChannelSize) + workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount) + // create a group of encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { encoder := encoderBuilder.Build() - w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter) - s.encodingWorkers = append(s.encodingWorkers, w) + s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, encodedCh) + } + // create defragmenter. + s.defragmenter = newDefragmenter(encodedCh, workerChannels) + // create a group of dml workers. + clock := clock.New() + for i := 0; i < cfg.WorkerCount; i++ { + inputCh := chann.NewAutoDrainChann[eventFragment]() + s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext, + inputCh, clock, s.statistics) + workerChannels[i] = inputCh } - s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, errCh) - s.defragmenter.outputFn = s.writer.dispatchFragToDMLWorker s.wg.Add(1) go func() { @@ -159,14 +172,23 @@ func NewDMLSink(ctx context.Context, func (s *DMLSink) run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - // run dml writer - eg.Go(func() error { - return s.writer.run(ctx) - }) // run the encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { - worker := s.encodingWorkers[i] + encodingWorker := s.encodingWorkers[i] + eg.Go(func() error { + return encodingWorker.run(ctx) + }) + } + + // run the defragmenter. + eg.Go(func() error { + return s.defragmenter.run(ctx) + }) + + // run dml workers. + for i := 0; i < len(s.workers); i++ { + worker := s.workers[i] eg.Go(func() error { return worker.run(ctx) }) @@ -212,16 +234,12 @@ func (s *DMLSink) Close() { } s.wg.Wait() - if s.defragmenter != nil { - s.defragmenter.close() - } - - for _, w := range s.encodingWorkers { - w.close() + for _, encodingWorker := range s.encodingWorkers { + encodingWorker.close() } - if s.writer != nil { - s.writer.close() + for _, worker := range s.workers { + worker.close() } if s.statistics != nil { diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go index faa835be759..92b03619092 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -34,6 +34,12 @@ import ( "github.com/stretchr/testify/require" ) +func setClock(s *DMLSink, clock clock.Clock) { + for _, w := range s.workers { + w.filePathGenerator.SetClock(clock) + } +} + func generateTxnEvents( cnt *uint64, batch int, @@ -89,6 +95,8 @@ func generateTxnEvents( } func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) @@ -159,6 +167,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { } func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) @@ -173,7 +183,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) mockClock := clock.NewMock() - s.writer.setClock(mockClock) + setClock(s, mockClock) var cnt uint64 = 0 batch := 100 @@ -205,7 +215,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test date (day) is NOT changed. mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) @@ -229,7 +240,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test date (day) is changed. mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) @@ -261,7 +273,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) mockClock = clock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter.go b/cdc/sink/dmlsink/cloudstorage/defragmenter.go index 4ae28559320..a637a531c2e 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter.go @@ -14,7 +14,10 @@ package cloudstorage import ( "context" - "sync" + + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/hash" ) // defragmenter is used to handle event fragments which can be registered @@ -22,38 +25,33 @@ import ( type defragmenter struct { lastWritten uint64 future map[uint64]eventFragment - wg sync.WaitGroup - inputCh chan eventFragment - - outputFn func(eventFragment) + inputCh <-chan eventFragment + outputChs []*chann.DrainableChann[eventFragment] + hasher *hash.PositionInertia } -func newDefragmenter(ctx context.Context) *defragmenter { - d := &defragmenter{ - future: make(map[uint64]eventFragment), - inputCh: make(chan eventFragment, defaultChannelSize), +func newDefragmenter( + inputCh <-chan eventFragment, + outputChs []*chann.DrainableChann[eventFragment], +) *defragmenter { + return &defragmenter{ + future: make(map[uint64]eventFragment), + inputCh: inputCh, + outputChs: outputChs, + hasher: hash.NewPositionInertia(), } - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.defragMsgs(ctx) - }() - return d -} - -func (d *defragmenter) registerFrag(frag eventFragment) { - d.inputCh <- frag } -func (d *defragmenter) defragMsgs(ctx context.Context) { +func (d *defragmenter) run(ctx context.Context) error { + defer d.close() for { select { case <-ctx.Done(): d.future = nil - return + return errors.Trace(ctx.Err()) case frag, ok := <-d.inputCh: if !ok { - return + return nil } // check whether to write messages to output channel right now next := d.lastWritten + 1 @@ -62,7 +60,7 @@ func (d *defragmenter) defragMsgs(ctx context.Context) { } else if frag.seqNumber > next { d.future[frag.seqNumber] = frag } else { - return + return nil } } } @@ -72,7 +70,7 @@ func (d *defragmenter) writeMsgsConsecutive( ctx context.Context, start eventFragment, ) { - d.outputFn(start) + d.dispatchFragToDMLWorker(start) d.lastWritten++ for { @@ -85,7 +83,7 @@ func (d *defragmenter) writeMsgsConsecutive( next := d.lastWritten + 1 if frag, ok := d.future[next]; ok { delete(d.future, next) - d.outputFn(frag) + d.dispatchFragToDMLWorker(frag) d.lastWritten = next } else { return @@ -93,6 +91,16 @@ func (d *defragmenter) writeMsgsConsecutive( } } +func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) { + tableName := frag.versionedTable.TableNameWithPhysicTableID + d.hasher.Reset() + d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) + workerID := d.hasher.Sum32() % uint32(len(d.outputChs)) + d.outputChs[workerID].In() <- frag +} + func (d *defragmenter) close() { - d.wg.Wait() + for _, ch := range d.outputChs { + ch.CloseAndDrain() + } } diff --git a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go index 5c59384836c..f84350efed7 100644 --- a/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go +++ b/cdc/sink/dmlsink/cloudstorage/defragmenter_test.go @@ -26,20 +26,27 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestDeframenter(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) - outputCh := make(chan eventFragment, 1024) - outputFn := func(frag eventFragment) { - outputCh <- frag - } - defrag := newDefragmenter(ctx) - defrag.outputFn = outputFn + eg, egCtx := errgroup.WithContext(ctx) + + inputCh := make(chan eventFragment) + outputCh := chann.NewAutoDrainChann[eventFragment]() + defrag := newDefragmenter(inputCh, []*chann.DrainableChann[eventFragment]{outputCh}) + eg.Go(func() error { + return defrag.run(egCtx) + }) + uri := "file:///tmp/test" txnCnt := 50 sinkURI, err := url.Parse(uri) @@ -113,7 +120,7 @@ func TestDeframenter(t *testing.T) { for _, msg := range frag.encodedMsgs { msg.Key = []byte(strconv.Itoa(int(seq))) } - defrag.registerFrag(frag) + inputCh <- frag }(uint64(i + 1)) } @@ -121,7 +128,7 @@ func TestDeframenter(t *testing.T) { LOOP: for { select { - case frag := <-outputCh: + case frag := <-outputCh.Out(): for _, msg := range frag.encodedMsgs { curSeq, err := strconv.Atoi(string(msg.Key)) require.Nil(t, err) @@ -133,5 +140,5 @@ LOOP: } } cancel() - defrag.close() + require.ErrorIs(t, eg.Wait(), context.Canceled) } diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker.go b/cdc/sink/dmlsink/cloudstorage/dml_worker.go index 7264a600988..01af6352c24 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker.go @@ -44,6 +44,7 @@ type dmlWorker struct { config *cloudstorage.Config // flushNotifyCh is used to notify that several tables can be flushed. flushNotifyCh chan flushTask + inputCh *chann.DrainableChann[eventFragment] // tableEvents maintains a mapping of . tableEvents *tableEventsMap // fileSize maintains a mapping of . @@ -83,6 +84,7 @@ func newDMLWorker( storage storage.ExternalStorage, config *cloudstorage.Config, extension string, + inputCh *chann.DrainableChann[eventFragment], clock clock.Clock, statistics *metrics.Statistics, ) *dmlWorker { @@ -91,6 +93,7 @@ func newDMLWorker( changeFeedID: changefeedID, storage: storage, config: config, + inputCh: inputCh, tableEvents: newTableEventsMap(), flushNotifyCh: make(chan flushTask, 1), fileSize: make(map[cloudstorage.VersionedTableName]uint64), @@ -108,13 +111,8 @@ func newDMLWorker( return d } -// setClock is used for unit test -func (d *dmlWorker) setClock(clock clock.Clock) { - d.filePathGenerator.SetClock(clock) -} - // run creates a set of background goroutines. -func (d *dmlWorker) run(ctx context.Context, ch *chann.DrainableChann[eventFragment]) error { +func (d *dmlWorker) run(ctx context.Context) error { log.Debug("dml worker started", zap.Int("workerID", d.id), zap.String("namespace", d.changeFeedID.Namespace), zap.String("changefeed", d.changeFeedID.ID)) @@ -125,7 +123,7 @@ func (d *dmlWorker) run(ctx context.Context, ch *chann.DrainableChann[eventFragm }) eg.Go(func() error { - return d.dispatchFlushTasks(ctx, ch) + return d.dispatchFlushTasks(ctx, d.inputCh) }) return eg.Wait() diff --git a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go index 3c0dc1fdc0d..5c4ccc20ff1 100644 --- a/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/dml_worker_test.go @@ -50,15 +50,17 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { statistics := metrics.NewStatistics(ctx, sink.TxnSink) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", clock.New(), statistics) + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics) return d } func TestDMLWorkerRun(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() d := testDMLWorker(ctx, t, parentDir) - fragCh := chann.NewAutoDrainChann[eventFragment]() + fragCh := d.inputCh table1Dir := path.Join(parentDir, "test/table1/99") // assume table1 and table2 are dispatched to the same DML worker table1 := model.TableName{ @@ -119,7 +121,7 @@ func TestDMLWorkerRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _ = d.run(ctx, fragCh) + _ = d.run(ctx) }() time.Sleep(4 * time.Second) diff --git a/cdc/sink/dmlsink/cloudstorage/dml_writer.go b/cdc/sink/dmlsink/cloudstorage/dml_writer.go deleted file mode 100644 index 8a9a4580eef..00000000000 --- a/cdc/sink/dmlsink/cloudstorage/dml_writer.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -package cloudstorage - -import ( - "context" - - "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink/metrics" - "github.com/pingcap/tiflow/engine/pkg/clock" - "github.com/pingcap/tiflow/pkg/chann" - "github.com/pingcap/tiflow/pkg/hash" - "github.com/pingcap/tiflow/pkg/sink/cloudstorage" - "golang.org/x/sync/errgroup" -) - -// dmlWriter manages a set of dmlWorkers and dispatches eventFragment to -// the dmlWorker according to hash algorithm. -type dmlWriter struct { - changefeedID model.ChangeFeedID - workers []*dmlWorker - workerChannels []*chann.DrainableChann[eventFragment] - hasher *hash.PositionInertia - storage storage.ExternalStorage - config *cloudstorage.Config - extension string - clock clock.Clock - statistics *metrics.Statistics - errCh chan<- error -} - -func newDMLWriter( - changefeedID model.ChangeFeedID, - storage storage.ExternalStorage, - config *cloudstorage.Config, - extension string, - statistics *metrics.Statistics, - errCh chan<- error, -) *dmlWriter { - d := &dmlWriter{ - changefeedID: changefeedID, - storage: storage, - workerChannels: make([]*chann.DrainableChann[eventFragment], config.WorkerCount), - workers: make([]*dmlWorker, config.WorkerCount), - hasher: hash.NewPositionInertia(), - clock: clock.New(), - config: config, - extension: extension, - statistics: statistics, - errCh: errCh, - } - - for i := 0; i < config.WorkerCount; i++ { - worker := newDMLWorker(i, changefeedID, storage, config, extension, d.clock, statistics) - d.workers[i] = worker - d.workerChannels[i] = chann.NewAutoDrainChann[eventFragment]() - } - - return d -} - -// setClock is used for unit test. -func (d *dmlWriter) setClock(clock clock.Clock) { - for i := 0; i < d.config.WorkerCount; i++ { - d.workers[i].setClock(clock) - } -} - -func (d *dmlWriter) run(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - - for i := 0; i < d.config.WorkerCount; i++ { - worker := d.workers[i] - ch := d.workerChannels[i] - eg.Go(func() error { - return worker.run(ctx, ch) - }) - } - - return eg.Wait() -} - -func (d *dmlWriter) dispatchFragToDMLWorker(frag eventFragment) { - tableName := frag.versionedTable.TableNameWithPhysicTableID - d.hasher.Reset() - d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) - workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount) - d.workerChannels[workerID].In() <- frag -} - -func (d *dmlWriter) close() { - for _, w := range d.workers { - w.close() - } - for _, ch := range d.workerChannels { - ch.CloseAndDrain() - } -} diff --git a/cdc/sink/dmlsink/cloudstorage/encoding_worker.go b/cdc/sink/dmlsink/cloudstorage/encoding_worker.go index 03d034eaa79..0ba69ef3957 100644 --- a/cdc/sink/dmlsink/cloudstorage/encoding_worker.go +++ b/cdc/sink/dmlsink/cloudstorage/encoding_worker.go @@ -31,23 +31,23 @@ type encodingWorker struct { changeFeedID model.ChangeFeedID encoder codec.TxnEventEncoder isClosed uint64 - inputCh chan eventFragment - defragmenter *defragmenter + inputCh <-chan eventFragment + outputCh chan<- eventFragment } func newEncodingWorker( workerID int, changefeedID model.ChangeFeedID, encoder codec.TxnEventEncoder, - inputCh chan eventFragment, - defragmenter *defragmenter, + inputCh <-chan eventFragment, + outputCh chan<- eventFragment, ) *encodingWorker { return &encodingWorker{ id: workerID, changeFeedID: changefeedID, encoder: encoder, inputCh: inputCh, - defragmenter: defragmenter, + outputCh: outputCh, } } @@ -84,7 +84,7 @@ func (w *encodingWorker) encodeEvents(frag eventFragment) error { } msgs := w.encoder.Build() frag.encodedMsgs = msgs - w.defragmenter.registerFrag(frag) + w.outputCh <- frag return nil } diff --git a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go index b1e6b8f497a..2a525f09874 100644 --- a/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go +++ b/cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go @@ -25,13 +25,17 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/pingcap/tiflow/pkg/sink/codec/builder" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) -func testEncodingWorker(ctx context.Context, t *testing.T) (*encodingWorker, func()) { +func testEncodingWorker( + t *testing.T, +) (*encodingWorker, chan eventFragment, chan eventFragment) { uri := fmt.Sprintf("file:///%s", t.TempDir()) sinkURI, err := url.Parse(uri) require.Nil(t, err) @@ -43,18 +47,23 @@ func testEncodingWorker(ctx context.Context, t *testing.T) (*encodingWorker, fun encoder := encoderBuilder.Build() changefeedID := model.DefaultChangeFeedID("test-encode") + encodedCh := make(chan eventFragment) msgCh := make(chan eventFragment, 1024) - defragmenter := newDefragmenter(ctx) - worker := newEncodingWorker(1, changefeedID, encoder, msgCh, defragmenter) - return worker, func() { - defragmenter.close() - } + return newEncodingWorker(1, changefeedID, encoder, msgCh, encodedCh), msgCh, encodedCh } func TestEncodeEvents(t *testing.T) { + t.Parallel() + + encodingWorker, _, encodedCh := testEncodingWorker(t) ctx, cancel := context.WithCancel(context.Background()) - worker, fn := testEncodingWorker(ctx, t) - defer fn() + eg, egCtx := errgroup.WithContext(ctx) + outputChs := []*chann.DrainableChann[eventFragment]{chann.NewAutoDrainChann[eventFragment]()} + defragmenter := newDefragmenter(encodedCh, outputChs) + eg.Go(func() error { + return defragmenter.run(egCtx) + }) + colInfos := []rowcodec.ColInfo{ { ID: 1, @@ -69,7 +78,7 @@ func TestEncodeEvents(t *testing.T) { Ft: types.NewFieldType(mysql.TypeString), }, } - err := worker.encodeEvents(eventFragment{ + err := encodingWorker.encodeEvents(eventFragment{ versionedTable: cloudstorage.VersionedTableName{ TableNameWithPhysicTableID: model.TableName{ Schema: "test", @@ -118,12 +127,21 @@ func TestEncodeEvents(t *testing.T) { }) require.Nil(t, err) cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestEncodingWorkerRun(t *testing.T) { + t.Parallel() + + encodingWorker, msgCh, encodedCh := testEncodingWorker(t) ctx, cancel := context.WithCancel(context.Background()) - worker, fn := testEncodingWorker(ctx, t) - defer fn() + eg, egCtx := errgroup.WithContext(ctx) + outputChs := []*chann.DrainableChann[eventFragment]{chann.NewAutoDrainChann[eventFragment]()} + defragmenter := newDefragmenter(encodedCh, outputChs) + eg.Go(func() error { + return defragmenter.run(egCtx) + }) + table := model.TableName{ Schema: "test", Table: "table1", @@ -166,17 +184,17 @@ func TestEncodingWorkerRun(t *testing.T) { Event: event, }, } - worker.inputCh <- frag + msgCh <- frag } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _ = worker.run(ctx) + _ = encodingWorker.run(ctx) }() cancel() - worker.close() + encodingWorker.close() wg.Wait() }