From 17422184ec89184e8d1e2abefaa3e98f963a3b57 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 25 Apr 2023 11:01:51 +0800 Subject: [PATCH] This is an automated cherry-pick of #8783 Signed-off-by: ti-chi-bot --- .../cloudstorage/cloud_storage_dml_sink.go | 74 ++++++++++++++----- .../cloud_storage_dml_sink_test.go | 21 +++++- .../eventsink/cloudstorage/defragmenter.go | 64 ++++++++-------- .../cloudstorage/defragmenter_test.go | 25 ++++++- .../eventsink/cloudstorage/dml_worker.go | 12 ++- .../eventsink/cloudstorage/dml_worker_test.go | 10 ++- .../eventsink/cloudstorage/encoding_worker.go | 14 +++- .../cloudstorage/encoding_worker_test.go | 65 +++++++++++++--- 8 files changed, 204 insertions(+), 81 deletions(-) diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go index fd08f0b02a1..de1f0d89987 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go @@ -22,12 +22,21 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go "github.com/pingcap/tiflow/cdc/sink/codec/builder" "github.com/pingcap/tiflow/cdc/sink/codec/common" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/metrics" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/pingcap/tiflow/cdc/sinkv2/util" +======= + "github.com/pingcap/tiflow/cdc/sink/dmlsink" + "github.com/pingcap/tiflow/cdc/sink/metrics" + "github.com/pingcap/tiflow/cdc/sink/tablesink/state" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/engine/pkg/clock" + "github.com/pingcap/tiflow/pkg/chann" +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" @@ -63,18 +72,19 @@ type eventFragment struct { // It will send the events to cloud storage systems. type dmlSink struct { changefeedID model.ChangeFeedID + // last sequence number + lastSeqNum uint64 // msgCh is a channel to hold eventFragment. msgCh chan eventFragment // encodingWorkers defines a group of workers for encoding events. encodingWorkers []*encodingWorker - // defragmenter is used to defragment the out-of-order encoded messages. - defragmenter *defragmenter - // writer is a dmlWriter which manages a group of dmlWorkers and + // defragmenter is used to defragment the out-of-order encoded messages and // sends encoded messages to individual dmlWorkers. - writer *dmlWriter + defragmenter *defragmenter + // workers defines a group of workers for writing events to external storage. + workers []*dmlWorker + statistics *metrics.Statistics - // last sequence number - lastSeqNum uint64 cancel func() wg sync.WaitGroup @@ -125,20 +135,30 @@ func NewCloudStorageSink( s := &dmlSink{ changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), msgCh: make(chan eventFragment, defaultChannelSize), - encodingWorkers: make([]*encodingWorker, 0, defaultEncodingConcurrency), - defragmenter: newDefragmenter(wgCtx), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), cancel: wgCancel, dead: make(chan struct{}), } + encodedCh := make(chan eventFragment, defaultChannelSize) + workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount) + // create a group of encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { encoder := encoderBuilder.Build() - w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter) - s.encodingWorkers = append(s.encodingWorkers, w) + s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, encodedCh) + } + // create defragmenter. + s.defragmenter = newDefragmenter(encodedCh, workerChannels) + // create a group of dml workers. + clock := clock.New() + for i := 0; i < cfg.WorkerCount; i++ { + inputCh := chann.NewAutoDrainChann[eventFragment]() + s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext, + inputCh, clock, s.statistics) + workerChannels[i] = inputCh } - orderedCh := s.defragmenter.orderedOut() - s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh) s.wg.Add(1) go func() { @@ -159,14 +179,23 @@ func NewCloudStorageSink( func (s *dmlSink) run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - // run dml writer - eg.Go(func() error { - return s.writer.run(ctx) - }) // run the encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { - worker := s.encodingWorkers[i] + encodingWorker := s.encodingWorkers[i] + eg.Go(func() error { + return encodingWorker.run(ctx) + }) + } + + // run the defragmenter. + eg.Go(func() error { + return s.defragmenter.run(ctx) + }) + + // run dml workers. + for i := 0; i < len(s.workers); i++ { + worker := s.workers[i] eg.Go(func() error { return worker.run(ctx) }) @@ -212,14 +241,21 @@ func (s *dmlSink) Close() { } s.wg.Wait() - if s.defragmenter != nil { - s.defragmenter.close() + for _, encodingWorker := range s.encodingWorkers { + encodingWorker.close() } +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go for _, w := range s.encodingWorkers { w.close() } s.writer.close() +======= + for _, worker := range s.workers { + worker.close() + } + +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go if s.statistics != nil { s.statistics.Close() } diff --git a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go index 42ec152d308..0204ce5972f 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink_test.go @@ -33,6 +33,12 @@ import ( "github.com/stretchr/testify/require" ) +func setClock(s *DMLSink, clock clock.Clock) { + for _, w := range s.workers { + w.filePathGenerator.SetClock(clock) + } +} + func generateTxnEvents( cnt *uint64, batch int, @@ -84,6 +90,8 @@ func generateTxnEvents( } func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) @@ -154,6 +162,8 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) { } func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() uri := fmt.Sprintf("file:///%s?flush-interval=2s", parentDir) @@ -168,7 +178,7 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { s, err := NewCloudStorageSink(ctx, sinkURI, replicaConfig, errCh) require.Nil(t, err) mockClock := clock.NewMock() - s.writer.setClock(mockClock) + setClock(s, mockClock) var cnt uint64 = 0 batch := 100 @@ -200,7 +210,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test date (day) is NOT changed. mockClock.Set(time.Date(2023, 3, 8, 23, 59, 59, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) @@ -224,7 +235,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { // test date (day) is changed. mockClock.Set(time.Date(2023, 3, 9, 0, 0, 10, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) @@ -256,7 +268,8 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) { require.Nil(t, err) mockClock = clock.NewMock() mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC)) - s.writer.setClock(mockClock) + setClock(s, mockClock) + err = s.WriteEvents(txns...) require.Nil(t, err) time.Sleep(3 * time.Second) diff --git a/cdc/sinkv2/eventsink/cloudstorage/defragmenter.go b/cdc/sinkv2/eventsink/cloudstorage/defragmenter.go index 7d898037a3d..a637a531c2e 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/defragmenter.go +++ b/cdc/sinkv2/eventsink/cloudstorage/defragmenter.go @@ -14,7 +14,10 @@ package cloudstorage import ( "context" - "sync" + + "github.com/pingcap/tiflow/pkg/chann" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/hash" ) // defragmenter is used to handle event fragments which can be registered @@ -22,42 +25,33 @@ import ( type defragmenter struct { lastWritten uint64 future map[uint64]eventFragment - wg sync.WaitGroup - inputCh chan eventFragment - outputCh chan eventFragment + inputCh <-chan eventFragment + outputChs []*chann.DrainableChann[eventFragment] + hasher *hash.PositionInertia } -func newDefragmenter(ctx context.Context) *defragmenter { - d := &defragmenter{ - future: make(map[uint64]eventFragment), - inputCh: make(chan eventFragment, defaultChannelSize), - outputCh: make(chan eventFragment, defaultChannelSize), +func newDefragmenter( + inputCh <-chan eventFragment, + outputChs []*chann.DrainableChann[eventFragment], +) *defragmenter { + return &defragmenter{ + future: make(map[uint64]eventFragment), + inputCh: inputCh, + outputChs: outputChs, + hasher: hash.NewPositionInertia(), } - d.wg.Add(1) - go func() { - defer d.wg.Done() - d.defragMsgs(ctx) - }() - return d -} - -func (d *defragmenter) registerFrag(frag eventFragment) { - d.inputCh <- frag -} - -func (d *defragmenter) orderedOut() <-chan eventFragment { - return d.outputCh } -func (d *defragmenter) defragMsgs(ctx context.Context) { +func (d *defragmenter) run(ctx context.Context) error { + defer d.close() for { select { case <-ctx.Done(): d.future = nil - return + return errors.Trace(ctx.Err()) case frag, ok := <-d.inputCh: if !ok { - return + return nil } // check whether to write messages to output channel right now next := d.lastWritten + 1 @@ -66,7 +60,7 @@ func (d *defragmenter) defragMsgs(ctx context.Context) { } else if frag.seqNumber > next { d.future[frag.seqNumber] = frag } else { - return + return nil } } } @@ -76,7 +70,7 @@ func (d *defragmenter) writeMsgsConsecutive( ctx context.Context, start eventFragment, ) { - d.outputCh <- start + d.dispatchFragToDMLWorker(start) d.lastWritten++ for { @@ -89,7 +83,7 @@ func (d *defragmenter) writeMsgsConsecutive( next := d.lastWritten + 1 if frag, ok := d.future[next]; ok { delete(d.future, next) - d.outputCh <- frag + d.dispatchFragToDMLWorker(frag) d.lastWritten = next } else { return @@ -97,6 +91,16 @@ func (d *defragmenter) writeMsgsConsecutive( } } +func (d *defragmenter) dispatchFragToDMLWorker(frag eventFragment) { + tableName := frag.versionedTable.TableNameWithPhysicTableID + d.hasher.Reset() + d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table)) + workerID := d.hasher.Sum32() % uint32(len(d.outputChs)) + d.outputChs[workerID].In() <- frag +} + func (d *defragmenter) close() { - d.wg.Wait() + for _, ch := range d.outputChs { + ch.CloseAndDrain() + } } diff --git a/cdc/sinkv2/eventsink/cloudstorage/defragmenter_test.go b/cdc/sinkv2/eventsink/cloudstorage/defragmenter_test.go index 893af484b56..e87ab19143e 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/defragmenter_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/defragmenter_test.go @@ -21,17 +21,34 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/defragmenter_test.go "github.com/pingcap/tiflow/cdc/sink/codec/builder" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/util" +======= + "github.com/pingcap/tiflow/cdc/sink/dmlsink" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/chann" +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/defragmenter_test.go "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestDeframenter(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) - defrag := newDefragmenter(ctx) + eg, egCtx := errgroup.WithContext(ctx) + + inputCh := make(chan eventFragment) + outputCh := chann.NewAutoDrainChann[eventFragment]() + defrag := newDefragmenter(inputCh, []*chann.DrainableChann[eventFragment]{outputCh}) + eg.Go(func() error { + return defrag.run(egCtx) + }) + uri := "file:///tmp/test" txnCnt := 50 sinkURI, err := url.Parse(uri) @@ -90,7 +107,7 @@ func TestDeframenter(t *testing.T) { for _, msg := range frag.encodedMsgs { msg.Key = []byte(strconv.Itoa(int(seq))) } - defrag.registerFrag(frag) + inputCh <- frag }(uint64(i + 1)) } @@ -98,7 +115,7 @@ func TestDeframenter(t *testing.T) { LOOP: for { select { - case frag := <-defrag.orderedOut(): + case frag := <-outputCh.Out(): for _, msg := range frag.encodedMsgs { curSeq, err := strconv.Atoi(string(msg.Key)) require.Nil(t, err) @@ -110,5 +127,5 @@ LOOP: } } cancel() - defrag.close() + require.ErrorIs(t, eg.Wait(), context.Canceled) } diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go index 80ed774cf5c..a41e9ca7dc2 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker.go @@ -44,6 +44,7 @@ type dmlWorker struct { config *cloudstorage.Config // flushNotifyCh is used to notify that several tables can be flushed. flushNotifyCh chan flushTask + inputCh *chann.DrainableChann[eventFragment] // tableEvents maintains a mapping of . tableEvents *tableEventsMap // fileSize maintains a mapping of . @@ -83,6 +84,7 @@ func newDMLWorker( storage storage.ExternalStorage, config *cloudstorage.Config, extension string, + inputCh *chann.DrainableChann[eventFragment], clock clock.Clock, statistics *metrics.Statistics, ) *dmlWorker { @@ -91,6 +93,7 @@ func newDMLWorker( changeFeedID: changefeedID, storage: storage, config: config, + inputCh: inputCh, tableEvents: newTableEventsMap(), flushNotifyCh: make(chan flushTask, 1), fileSize: make(map[cloudstorage.VersionedTableName]uint64), @@ -108,13 +111,8 @@ func newDMLWorker( return d } -// setClock is used for unit test -func (d *dmlWorker) setClock(clock clock.Clock) { - d.filePathGenerator.SetClock(clock) -} - // run creates a set of background goroutines. -func (d *dmlWorker) run(ctx context.Context, ch *chann.DrainableChann[eventFragment]) error { +func (d *dmlWorker) run(ctx context.Context) error { log.Debug("dml worker started", zap.Int("workerID", d.id), zap.String("namespace", d.changeFeedID.Namespace), zap.String("changefeed", d.changeFeedID.ID)) @@ -125,7 +123,7 @@ func (d *dmlWorker) run(ctx context.Context, ch *chann.DrainableChann[eventFragm }) eg.Go(func() error { - return d.dispatchFlushTasks(ctx, ch) + return d.dispatchFlushTasks(ctx, d.inputCh) }) return eg.Wait() diff --git a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go index 6f1337d04e5..5f4ce85bf88 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go @@ -50,15 +50,21 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker { statistics := metrics.NewStatistics(ctx, sink.TxnSink) d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage, - cfg, ".json", clock.New(), statistics) + cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics) return d } func TestDMLWorkerRun(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) parentDir := t.TempDir() d := testDMLWorker(ctx, t, parentDir) +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go fragCh := chann.NewDrainableChann[eventFragment]() +======= + fragCh := d.inputCh +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/dml_worker_test.go table1Dir := path.Join(parentDir, "test/table1/99") // assume table1 and table2 are dispatched to the same DML worker table1 := model.TableName{ @@ -119,7 +125,7 @@ func TestDMLWorkerRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _ = d.run(ctx, fragCh) + _ = d.run(ctx) }() time.Sleep(4 * time.Second) diff --git a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go index 494931eca1f..f85e4cea7a9 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go +++ b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go @@ -31,23 +31,29 @@ type encodingWorker struct { changeFeedID model.ChangeFeedID encoder codec.EventBatchEncoder isClosed uint64 - inputCh chan eventFragment - defragmenter *defragmenter + inputCh <-chan eventFragment + outputCh chan<- eventFragment } func newEncodingWorker( workerID int, changefeedID model.ChangeFeedID, +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/encoding_worker.go encoder codec.EventBatchEncoder, inputCh chan eventFragment, defragmenter *defragmenter, +======= + encoder codec.TxnEventEncoder, + inputCh <-chan eventFragment, + outputCh chan<- eventFragment, +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/encoding_worker.go ) *encodingWorker { return &encodingWorker{ id: workerID, changeFeedID: changefeedID, encoder: encoder, inputCh: inputCh, - defragmenter: defragmenter, + outputCh: outputCh, } } @@ -97,7 +103,7 @@ func (w *encodingWorker) encodeEvents(ctx context.Context, frag eventFragment) e msgs := w.encoder.Build() frag.encodedMsgs = msgs - w.defragmenter.registerFrag(frag) + w.outputCh <- frag return nil } diff --git a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go index 4b9a76f0747..9693e4f65b2 100644 --- a/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go +++ b/cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go @@ -20,15 +20,24 @@ import ( "testing" "github.com/pingcap/tiflow/cdc/model" +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go "github.com/pingcap/tiflow/cdc/sink/codec/builder" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/util" +======= + "github.com/pingcap/tiflow/cdc/sink/dmlsink" + "github.com/pingcap/tiflow/cdc/sink/util" + "github.com/pingcap/tiflow/pkg/chann" +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/cloudstorage" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) -func testEncodingWorker(ctx context.Context, t *testing.T) (*encodingWorker, func()) { +func testEncodingWorker( + t *testing.T, +) (*encodingWorker, chan eventFragment, chan eventFragment) { uri := fmt.Sprintf("file:///%s", t.TempDir()) sinkURI, err := url.Parse(uri) require.Nil(t, err) @@ -40,19 +49,44 @@ func testEncodingWorker(ctx context.Context, t *testing.T) (*encodingWorker, fun encoder := encoderBuilder.Build() changefeedID := model.DefaultChangeFeedID("test-encode") + encodedCh := make(chan eventFragment) msgCh := make(chan eventFragment, 1024) - defragmenter := newDefragmenter(ctx) - worker := newEncodingWorker(1, changefeedID, encoder, msgCh, defragmenter) - return worker, func() { - defragmenter.close() - } + return newEncodingWorker(1, changefeedID, encoder, msgCh, encodedCh), msgCh, encodedCh } func TestEncodeEvents(t *testing.T) { + t.Parallel() + + encodingWorker, _, encodedCh := testEncodingWorker(t) ctx, cancel := context.WithCancel(context.Background()) +<<<<<<< HEAD:cdc/sinkv2/eventsink/cloudstorage/encoding_worker_test.go worker, fn := testEncodingWorker(ctx, t) defer fn() err := worker.encodeEvents(ctx, eventFragment{ +======= + eg, egCtx := errgroup.WithContext(ctx) + outputChs := []*chann.DrainableChann[eventFragment]{chann.NewAutoDrainChann[eventFragment]()} + defragmenter := newDefragmenter(encodedCh, outputChs) + eg.Go(func() error { + return defragmenter.run(egCtx) + }) + + colInfos := []rowcodec.ColInfo{ + { + ID: 1, + IsPKHandle: false, + VirtualGenCol: false, + Ft: types.NewFieldType(mysql.TypeLong), + }, + { + ID: 2, + IsPKHandle: false, + VirtualGenCol: false, + Ft: types.NewFieldType(mysql.TypeString), + }, + } + err := encodingWorker.encodeEvents(eventFragment{ +>>>>>>> 051cb2efdc (sink(ticdc): remove redundant channel and dmlWriter in storage sink (#8783)):cdc/sink/dmlsink/cloudstorage/encoding_worker_test.go versionedTable: cloudstorage.VersionedTableName{ TableNameWithPhysicTableID: model.TableName{ Schema: "test", @@ -99,12 +133,21 @@ func TestEncodeEvents(t *testing.T) { }) require.Nil(t, err) cancel() + require.ErrorIs(t, eg.Wait(), context.Canceled) } func TestEncodingWorkerRun(t *testing.T) { + t.Parallel() + + encodingWorker, msgCh, encodedCh := testEncodingWorker(t) ctx, cancel := context.WithCancel(context.Background()) - worker, fn := testEncodingWorker(ctx, t) - defer fn() + eg, egCtx := errgroup.WithContext(ctx) + outputChs := []*chann.DrainableChann[eventFragment]{chann.NewAutoDrainChann[eventFragment]()} + defragmenter := newDefragmenter(encodedCh, outputChs) + eg.Go(func() error { + return defragmenter.run(egCtx) + }) + table := model.TableName{ Schema: "test", Table: "table1", @@ -143,17 +186,17 @@ func TestEncodingWorkerRun(t *testing.T) { Event: event, }, } - worker.inputCh <- frag + msgCh <- frag } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _ = worker.run(ctx) + _ = encodingWorker.run(ctx) }() cancel() - worker.close() + encodingWorker.close() wg.Wait() }