Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: port sink flow control for old processor & new processor to release-5.0 #1840

Merged
8 changes: 8 additions & 0 deletions cdc/metrics_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
tableMemoryGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
}, []string{"changefeed", "capture", "table"})
)

// initProcessorMetrics registers all metrics used in processor
Expand All @@ -95,4 +102,5 @@ func initProcessorMetrics(registry *prometheus.Registry) {
registry.MustRegister(txnCounter)
registry.MustRegister(updateInfoDuration)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(tableMemoryGauge)
}
173 changes: 147 additions & 26 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/pingcap/ticdc/cdc/puller"
psorter "github.com/pingcap/ticdc/cdc/puller/sorter"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/notify"
Expand All @@ -58,6 +60,11 @@ const (
defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20

// for better sink performance under flow control
resolvedTsInterpolateInterval = 200 * time.Millisecond
flushMemoryMetricsDuration = time.Second * 5
flowControlOutChSize = 128
)

type oldProcessor struct {
Expand Down Expand Up @@ -919,7 +926,117 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc()
}

const maxLagWithCheckpointTs = (30 * 1000) << 18 // 30s
// runFlowControl controls the flow of events out of the sorter.
func (p *oldProcessor) runFlowControl(
ctx context.Context,
tableID model.TableID,
flowController *common.TableFlowController,
inCh <-chan *model.PolymorphicEvent,
outCh chan<- *model.PolymorphicEvent) {
var (
lastSendResolvedTsTime time.Time
lastCRTs, lastSentResolvedTs uint64
)

for {
select {
case <-ctx.Done():
// NOTE: This line is buggy, because `context.Canceled` may indicate an actual error.
// TODO Will be resolved together with other similar problems.
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case event, ok := <-inCh:
if !ok {
// sorter output channel has been closed.
// The sorter must have exited and has a reportable exit reason,
// so we don't need to worry about sending an error here.
log.Info("sorter output channel closed",
zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx))
return
}

if event == nil || event.RawKV == nil {
// This is an invariant violation.
log.Panic("unexpected empty event", zap.Reflect("event", event))
}

if event.RawKV.OpType != model.OpTypeResolved {
size := uint64(event.RawKV.ApproximateSize())
commitTs := event.CRTs
// We interpolate a resolved-ts if none has been sent for some time.
if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval {
// Refer to `cdc/processor/pipeline/sorter.go` for detailed explanation of the design.
// This is a backport.
if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs {
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
interpolatedEvent := model.NewResolvedPolymorphicEvent(0, lastCRTs)

select {
case <-ctx.Done():
// TODO fix me
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case outCh <- interpolatedEvent:
}
}
}
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
err := flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
// Not sending a Resolved Event here will very likely deadlock the pipeline.
// NOTE: This is NOT an optimization, but is for liveness.
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()

msg := model.NewResolvedPolymorphicEvent(0, lastCRTs)
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- msg:
}
}
return nil
})
if err != nil {
log.Error("flow control error", zap.Error(err))
if cerror.ErrFlowControllerAborted.Equal(err) {
log.Info("flow control cancelled for table",
zap.Int64("tableID", tableID),
util.ZapFieldChangefeed(ctx))
} else {
p.sendError(ctx.Err())
}
return
}
lastCRTs = commitTs
} else {
// handle OpTypeResolved
if event.CRTs < lastSentResolvedTs {
continue
}
lastSentResolvedTs = event.CRTs
lastSendResolvedTsTime = time.Now()
}

select {
case <-ctx.Done():
// TODO fix me
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case outCh <- event:
}
}
}
}

// sorterConsume receives sorted PolymorphicEvent from sorter of each table and
// sends to processor's output chan
Expand All @@ -933,7 +1050,7 @@ func (p *oldProcessor) sorterConsume(
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
) {
var lastResolvedTs, lastCheckPointTs uint64
var lastResolvedTs uint64
opDone := false
resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
checkDoneTicker := time.NewTicker(1 * time.Second)
Expand Down Expand Up @@ -1022,6 +1139,19 @@ func (p *oldProcessor) sorterConsume(
}
defer globalResolvedTsReceiver.Stop()

perTableMemoryQuota := serverConfig.GetGlobalServerConfig().PerTableMemoryQuota
log.Debug("creating table flow controller",
zap.String("table-name", tableName),
zap.Int64("table-id", tableID),
zap.Uint64("quota", perTableMemoryQuota),
util.ZapFieldChangefeed(ctx))

flowController := common.NewTableFlowController(perTableMemoryQuota)
defer func() {
flowController.Abort()
tableMemoryGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
}()

sendResolvedTs2Sink := func() error {
localResolvedTs := atomic.LoadUint64(&p.localResolvedTs)
globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs)
Expand All @@ -1044,52 +1174,43 @@ func (p *oldProcessor) sorterConsume(
}
return err
}
lastCheckPointTs = checkpointTs

if checkpointTs < replicaInfo.StartTs {
checkpointTs = replicaInfo.StartTs
}

if checkpointTs != 0 {
atomic.StoreUint64(pCheckpointTs, checkpointTs)
flowController.Release(checkpointTs)
p.localCheckpointTsNotifier.Notify()
}
return nil
}

flowControlOutCh := make(chan *model.PolymorphicEvent, flowControlOutChSize)
go func() {
p.runFlowControl(ctx, tableID, flowController, sorter.Output(), flowControlOutCh)
close(flowControlOutCh)
}()

metricsTableMemoryGauge := tableMemoryGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName)
metricsTicker := time.NewTicker(flushMemoryMetricsDuration)
defer metricsTicker.Stop()

for {
select {
case <-ctx.Done():
if errors.Cause(ctx.Err()) != context.Canceled {
p.sendError(ctx.Err())
}
return
case pEvent := <-sorter.Output():
case <-metricsTicker.C:
metricsTableMemoryGauge.Set(float64(flowController.GetConsumption()))
case pEvent := <-flowControlOutCh:
if pEvent == nil {
continue
}

for lastResolvedTs > maxLagWithCheckpointTs+lastCheckPointTs {
log.Debug("the lag between local checkpoint Ts and local resolved Ts is too lang",
zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("lastCheckPointTs", lastCheckPointTs),
zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx))
select {
case <-ctx.Done():
if ctx.Err() != context.Canceled {
p.sendError(errors.Trace(ctx.Err()))
}
return
case <-globalResolvedTsReceiver.C:
if err := sendResolvedTs2Sink(); err != nil {
// error is already sent to processor, so we can just ignore it
return
}
case <-checkDoneTicker.C:
if !opDone {
checkDone()
}
}
}

pEvent.SetUpFinishedChan()
select {
case <-ctx.Done():
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/pipeline/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ var (
Name: "txn_count",
Help: "txn count received/executed by this processor",
}, []string{"type", "changefeed", "capture"})
tableMemoryGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_memory_consumption",
Help: "estimated memory consumption for a table after the sorter",
}, []string{"changefeed", "capture", "table"})
)

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(tableResolvedTsGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(tableMemoryGauge)
}
9 changes: 8 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,20 @@ type sinkNode struct {

eventBuffer []*model.PolymorphicEvent
rowBuffer []*model.RowChangedEvent

flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode {
func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
resolvedTs: startTs,
checkpointTs: startTs,
barrierTs: startTs,

flowController: flowController,
}
}

Expand Down Expand Up @@ -130,6 +134,8 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
return nil
}
atomic.StoreUint64(&n.checkpointTs, checkpointTs)

n.flowController.Release(checkpointTs)
return nil
}

Expand Down Expand Up @@ -216,5 +222,6 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
n.status.store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close()
}
26 changes: 22 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ type mockSink struct {
}
}

// mockFlowController is created because a real tableFlowController cannot be used
// we are testing sinkNode by itself.
type mockFlowController struct{}

func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
return nil
}

func (c *mockFlowController) Release(resolvedTs uint64) {
}

func (c *mockFlowController) Abort() {
}

func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}
Expand Down Expand Up @@ -90,7 +108,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
ctx := context.NewContext(stdContext.Background(), &context.Vars{})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10)
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -116,7 +134,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10)
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -138,7 +156,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(6))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10)
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -164,7 +182,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.Vars{})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10)
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down
Loading