Skip to content

Commit

Permalink
owner(ticdc): asynchronously create sink (pingcap#3598) (pingcap#3961)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 24, 2022
1 parent d9af323 commit 75dfaa2
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 292 deletions.
187 changes: 0 additions & 187 deletions cdc/owner/async_sink.go

This file was deleted.

31 changes: 15 additions & 16 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type changefeed struct {
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
sink DDLSink
ddlPuller DDLPuller
initialized bool

Expand All @@ -51,14 +51,13 @@ type changefeed struct {
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent

errCh chan error
errCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, Sink, etc.
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
// But it only manages the DDLPuller for now.
// TODO: manage the Sink and other backend goroutines.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
Expand All @@ -67,7 +66,7 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
newSink func() DDLSink
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
Expand All @@ -82,15 +81,15 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
}
c.newSink = newAsyncSink
return c
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -163,7 +162,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
default:
}

c.sink.EmitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(ctx, checkpointTs)
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -246,12 +245,12 @@ LOOP:
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel
c.sink, err = c.newSink(cancelCtx)
if err != nil {
return errors.Trace(err)
}

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
Expand Down Expand Up @@ -287,7 +286,7 @@ func (c *changefeed) releaseResources() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.Close(ctx); err != nil {
if err := c.sink.close(ctx); err != nil {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down Expand Up @@ -397,7 +396,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil {
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
Expand Down Expand Up @@ -438,7 +437,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job))
return true, nil
}
done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache)
done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 75dfaa2

Please sign in to comment.