Skip to content

Commit

Permalink
owner(ticdc): asynchronously create sink (pingcap#3598) (pingcap#3962)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 23, 2022
1 parent 029cba3 commit 220b1d5
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 292 deletions.
187 changes: 0 additions & 187 deletions cdc/owner/async_sink.go

This file was deleted.

31 changes: 15 additions & 16 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type changefeed struct {
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
sink DDLSink
ddlPuller DDLPuller
initialized bool

Expand All @@ -53,14 +53,13 @@ type changefeed struct {
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent

errCh chan error
errCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, Sink, etc.
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
// But it only manages the DDLPuller for now.
// TODO: manage the Sink and other backend goroutines.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
Expand All @@ -69,7 +68,7 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
newSink func() DDLSink
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
Expand All @@ -84,15 +83,15 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
}
c.newSink = newAsyncSink
return c
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -165,7 +164,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
default:
}

c.sink.EmitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(ctx, checkpointTs)
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -248,12 +247,12 @@ LOOP:
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel
c.sink, err = c.newSink(cancelCtx)
if err != nil {
return errors.Trace(err)
}

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
Expand Down Expand Up @@ -289,7 +288,7 @@ func (c *changefeed) releaseResources() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.Close(ctx); err != nil {
if err := c.sink.close(ctx); err != nil {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down Expand Up @@ -399,7 +398,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil {
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
Expand Down Expand Up @@ -444,7 +443,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job))
return true, nil
}
done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache)
done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 220b1d5

Please sign in to comment.