Skip to content

Commit

Permalink
remove redundant channel
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Apr 12, 2023
1 parent ab752f4 commit d9f0950
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 36 deletions.
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func NewDMLSink(ctx context.Context,
w := newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, s.defragmenter)
s.encodingWorkers = append(s.encodingWorkers, w)
}
orderedCh := s.defragmenter.orderedOut()
s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh)
s.writer = newDMLWriter(s.changefeedID, storage, cfg, ext, s.statistics, errCh)
s.defragmenter.outputFn = s.writer.dispatchFragToDMLWorker

s.wg.Add(1)
go func() {
Expand Down
16 changes: 6 additions & 10 deletions cdc/sink/dmlsink/cloudstorage/defragmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ type defragmenter struct {
future map[uint64]eventFragment
wg sync.WaitGroup
inputCh chan eventFragment
outputCh chan eventFragment

outputFn func(eventFragment)
}

func newDefragmenter(ctx context.Context) *defragmenter {
d := &defragmenter{
future: make(map[uint64]eventFragment),
inputCh: make(chan eventFragment, defaultChannelSize),
outputCh: make(chan eventFragment, defaultChannelSize),
future: make(map[uint64]eventFragment),
inputCh: make(chan eventFragment, defaultChannelSize),
}
d.wg.Add(1)
go func() {
Expand All @@ -45,10 +45,6 @@ func (d *defragmenter) registerFrag(frag eventFragment) {
d.inputCh <- frag
}

func (d *defragmenter) orderedOut() <-chan eventFragment {
return d.outputCh
}

func (d *defragmenter) defragMsgs(ctx context.Context) {
for {
select {
Expand Down Expand Up @@ -76,7 +72,7 @@ func (d *defragmenter) writeMsgsConsecutive(
ctx context.Context,
start eventFragment,
) {
d.outputCh <- start
d.outputFn(start)

d.lastWritten++
for {
Expand All @@ -89,7 +85,7 @@ func (d *defragmenter) writeMsgsConsecutive(
next := d.lastWritten + 1
if frag, ok := d.future[next]; ok {
delete(d.future, next)
d.outputCh <- frag
d.outputFn(frag)
d.lastWritten = next
} else {
return
Expand Down
7 changes: 6 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/defragmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ import (

func TestDeframenter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
outputCh := make(chan eventFragment, 1024)
outputFn := func(frag eventFragment) {
outputCh <- frag
}
defrag := newDefragmenter(ctx)
defrag.outputFn = outputFn
uri := "file:///tmp/test"
txnCnt := 50
sinkURI, err := url.Parse(uri)
Expand Down Expand Up @@ -116,7 +121,7 @@ func TestDeframenter(t *testing.T) {
LOOP:
for {
select {
case frag := <-defrag.orderedOut():
case frag := <-outputCh:
for _, msg := range frag.encodedMsgs {
curSeq, err := strconv.Atoi(string(msg.Key))
require.Nil(t, err)
Expand Down
29 changes: 6 additions & 23 deletions cdc/sink/dmlsink/cloudstorage/dml_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package cloudstorage
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
Expand All @@ -38,7 +37,6 @@ type dmlWriter struct {
extension string
clock clock.Clock
statistics *metrics.Statistics
inputCh <-chan eventFragment
errCh chan<- error
}

Expand All @@ -48,7 +46,6 @@ func newDMLWriter(
config *cloudstorage.Config,
extension string,
statistics *metrics.Statistics,
inputCh <-chan eventFragment,
errCh chan<- error,
) *dmlWriter {
d := &dmlWriter{
Expand All @@ -61,7 +58,6 @@ func newDMLWriter(
config: config,
extension: extension,
statistics: statistics,
inputCh: inputCh,
errCh: errCh,
}

Expand All @@ -83,9 +79,6 @@ func (d *dmlWriter) setClock(clock clock.Clock) {

func (d *dmlWriter) run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return d.dispatchFragToDMLWorker(ctx)
})

for i := 0; i < d.config.WorkerCount; i++ {
worker := d.workers[i]
Expand All @@ -98,22 +91,12 @@ func (d *dmlWriter) run(ctx context.Context) error {
return eg.Wait()
}

func (d *dmlWriter) dispatchFragToDMLWorker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case frag, ok := <-d.inputCh:
if !ok {
return nil
}
tableName := frag.versionedTable.TableNameWithPhysicTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount)
d.workerChannels[workerID].In() <- frag
}
}
func (d *dmlWriter) dispatchFragToDMLWorker(frag eventFragment) {
tableName := frag.versionedTable.TableNameWithPhysicTableID
d.hasher.Reset()
d.hasher.Write([]byte(tableName.Schema), []byte(tableName.Table))
workerID := d.hasher.Sum32() % uint32(d.config.WorkerCount)
d.workerChannels[workerID].In() <- frag
}

func (d *dmlWriter) close() {
Expand Down

0 comments on commit d9f0950

Please sign in to comment.