From c5560f565b8adc32dac4dc13ba0daf8ea5999b71 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 12 May 2021 15:19:39 +0800 Subject: [PATCH 1/3] *: sink flow control for old processor (#1751) --- Makefile | 2 +- cdc/metrics_processor.go | 7 + cdc/processor.go | 173 +++++++-- cdc/processor/pipeline/metrics.go | 8 + cdc/processor/pipeline/sink.go | 9 +- cdc/processor/pipeline/sink_test.go | 26 +- cdc/processor/pipeline/sorter.go | 92 ++++- cdc/processor/pipeline/table.go | 31 +- cdc/sink/common/flow_control.go | 226 +++++++++++ cdc/sink/common/flow_control_test.go | 558 +++++++++++++++++++++++++++ cmd/server_test.go | 5 +- errors.toml | 10 + pkg/config/config.go | 12 +- pkg/config/config_test.go | 7 +- pkg/errors/errors.go | 4 + pkg/pipeline/pipeline.go | 2 +- 16 files changed, 1127 insertions(+), 45 deletions(-) create mode 100644 cdc/sink/common/flow_control.go create mode 100644 cdc/sink/common/flow_control_test.go diff --git a/Makefile b/Makefile index 9f03b38eb53..1f6315a121e 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ check_third_party_binary: integration_test_build: check_failpoint_ctl ./scripts/fix_lib_zstd.sh $(FAILPOINT_ENABLE) - $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covemode=atomic \ + $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \ -coverpkg=github.com/pingcap/ticdc/... \ -o bin/cdc.test github.com/pingcap/ticdc \ || { $(FAILPOINT_DISABLE); exit 1; } diff --git a/cdc/metrics_processor.go b/cdc/metrics_processor.go index 7cc422b0219..3e209415bb3 100644 --- a/cdc/metrics_processor.go +++ b/cdc/metrics_processor.go @@ -82,6 +82,13 @@ var ( Name: "exit_with_error_count", Help: "counter for processor exits with error", }, []string{"changefeed", "capture"}) + tableMemoryGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "processor", + Name: "table_memory_consumption", + Help: "estimated memory consumption for a table after the sorter", + }, []string{"changefeed", "capture", "table"}) ) // initProcessorMetrics registers all metrics used in processor diff --git a/cdc/processor.go b/cdc/processor.go index 9436d6b884d..bca6c16007b 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -34,6 +34,8 @@ import ( "github.com/pingcap/ticdc/cdc/puller" psorter "github.com/pingcap/ticdc/cdc/puller/sorter" "github.com/pingcap/ticdc/cdc/sink" + "github.com/pingcap/ticdc/cdc/sink/common" + serverConfig "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/notify" @@ -58,6 +60,11 @@ const ( defaultSyncResolvedBatch = 1024 schemaStorageGCLag = time.Minute * 20 + + // for better sink performance under flow control + resolvedTsInterpolateInterval = 200 * time.Millisecond + flushMemoryMetricsDuration = time.Second * 5 + flowControlOutChSize = 128 ) type oldProcessor struct { @@ -915,7 +922,117 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Inc() } -const maxLagWithCheckpointTs = (30 * 1000) << 18 // 30s +// runFlowControl controls the flow of events out of the sorter. +func (p *oldProcessor) runFlowControl( + ctx context.Context, + tableID model.TableID, + flowController *common.TableFlowController, + inCh <-chan *model.PolymorphicEvent, + outCh chan<- *model.PolymorphicEvent) { + var ( + lastSendResolvedTsTime time.Time + lastCRTs, lastSentResolvedTs uint64 + ) + + for { + select { + case <-ctx.Done(): + // NOTE: This line is buggy, because `context.Canceled` may indicate an actual error. + // TODO Will be resolved together with other similar problems. + if errors.Cause(ctx.Err()) != context.Canceled { + p.sendError(ctx.Err()) + } + return + case event, ok := <-inCh: + if !ok { + // sorter output channel has been closed. + // The sorter must have exited and has a reportable exit reason, + // so we don't need to worry about sending an error here. + log.Info("sorter output channel closed", + zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx)) + return + } + + if event == nil || event.RawKV == nil { + // This is an invariant violation. + log.Panic("unexpected empty event", zap.Reflect("event", event)) + } + + if event.RawKV.OpType != model.OpTypeResolved { + size := uint64(event.RawKV.ApproximateSize()) + commitTs := event.CRTs + // We interpolate a resolved-ts if none has been sent for some time. + if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval { + // Refer to `cdc/processor/pipeline/sorter.go` for detailed explanation of the design. + // This is a backport. + if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + interpolatedEvent := model.NewResolvedPolymorphicEvent(0, lastCRTs) + + select { + case <-ctx.Done(): + // TODO fix me + if errors.Cause(ctx.Err()) != context.Canceled { + p.sendError(ctx.Err()) + } + return + case outCh <- interpolatedEvent: + } + } + } + // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. + // Otherwise the pipeline would deadlock. + err := flowController.Consume(commitTs, size, func() error { + if lastCRTs > lastSentResolvedTs { + // If we are blocking, we send a Resolved Event here to elicit a sink-flush. + // Not sending a Resolved Event here will very likely deadlock the pipeline. + // NOTE: This is NOT an optimization, but is for liveness. + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + + msg := model.NewResolvedPolymorphicEvent(0, lastCRTs) + select { + case <-ctx.Done(): + return ctx.Err() + case outCh <- msg: + } + } + return nil + }) + if err != nil { + log.Error("flow control error", zap.Error(err)) + if cerror.ErrFlowControllerAborted.Equal(err) { + log.Info("flow control cancelled for table", + zap.Int64("tableID", tableID), + util.ZapFieldChangefeed(ctx)) + } else { + p.sendError(ctx.Err()) + } + return + } + lastCRTs = commitTs + } else { + // handle OpTypeResolved + if event.CRTs < lastSentResolvedTs { + continue + } + lastSentResolvedTs = event.CRTs + lastSendResolvedTsTime = time.Now() + } + + select { + case <-ctx.Done(): + // TODO fix me + if errors.Cause(ctx.Err()) != context.Canceled { + p.sendError(ctx.Err()) + } + return + case outCh <- event: + } + } + } +} // sorterConsume receives sorted PolymorphicEvent from sorter of each table and // sends to processor's output chan @@ -929,7 +1046,7 @@ func (p *oldProcessor) sorterConsume( replicaInfo *model.TableReplicaInfo, sink sink.Sink, ) { - var lastResolvedTs, lastCheckPointTs uint64 + var lastResolvedTs uint64 opDone := false resolvedTsGauge := tableResolvedTsGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName) checkDoneTicker := time.NewTicker(1 * time.Second) @@ -1018,6 +1135,19 @@ func (p *oldProcessor) sorterConsume( } defer globalResolvedTsReceiver.Stop() + perTableMemoryQuota := serverConfig.GetGlobalServerConfig().PerTableMemoryQuota + log.Debug("creating table flow controller", + zap.String("table-name", tableName), + zap.Int64("table-id", tableID), + zap.Uint64("quota", perTableMemoryQuota), + util.ZapFieldChangefeed(ctx)) + + flowController := common.NewTableFlowController(perTableMemoryQuota) + defer func() { + flowController.Abort() + tableMemoryGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName) + }() + sendResolvedTs2Sink := func() error { localResolvedTs := atomic.LoadUint64(&p.localResolvedTs) globalResolvedTs := atomic.LoadUint64(&p.globalResolvedTs) @@ -1040,7 +1170,6 @@ func (p *oldProcessor) sorterConsume( } return err } - lastCheckPointTs = checkpointTs if checkpointTs < replicaInfo.StartTs { checkpointTs = replicaInfo.StartTs @@ -1048,10 +1177,22 @@ func (p *oldProcessor) sorterConsume( if checkpointTs != 0 { atomic.StoreUint64(pCheckpointTs, checkpointTs) + flowController.Release(checkpointTs) p.localCheckpointTsNotifier.Notify() } return nil } + + flowControlOutCh := make(chan *model.PolymorphicEvent, flowControlOutChSize) + go func() { + p.runFlowControl(ctx, tableID, flowController, sorter.Output(), flowControlOutCh) + close(flowControlOutCh) + }() + + metricsTableMemoryGauge := tableMemoryGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tableName) + metricsTicker := time.NewTicker(flushMemoryMetricsDuration) + defer metricsTicker.Stop() + for { select { case <-ctx.Done(): @@ -1059,33 +1200,13 @@ func (p *oldProcessor) sorterConsume( p.sendError(ctx.Err()) } return - case pEvent := <-sorter.Output(): + case <-metricsTicker.C: + metricsTableMemoryGauge.Set(float64(flowController.GetConsumption())) + case pEvent := <-flowControlOutCh: if pEvent == nil { continue } - for lastResolvedTs > maxLagWithCheckpointTs+lastCheckPointTs { - log.Debug("the lag between local checkpoint Ts and local resolved Ts is too lang", - zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("lastCheckPointTs", lastCheckPointTs), - zap.Int64("tableID", tableID), util.ZapFieldChangefeed(ctx)) - select { - case <-ctx.Done(): - if ctx.Err() != context.Canceled { - p.sendError(errors.Trace(ctx.Err())) - } - return - case <-globalResolvedTsReceiver.C: - if err := sendResolvedTs2Sink(); err != nil { - // error is already sent to processor, so we can just ignore it - return - } - case <-checkDoneTicker.C: - if !opDone { - checkDone() - } - } - } - pEvent.SetUpFinishedChan() select { case <-ctx.Done(): diff --git a/cdc/processor/pipeline/metrics.go b/cdc/processor/pipeline/metrics.go index 8511624c7a3..f2c4c6c0e3a 100644 --- a/cdc/processor/pipeline/metrics.go +++ b/cdc/processor/pipeline/metrics.go @@ -32,10 +32,18 @@ var ( Name: "txn_count", Help: "txn count received/executed by this processor", }, []string{"type", "changefeed", "capture"}) + tableMemoryGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "processor", + Name: "table_memory_consumption", + Help: "estimated memory consumption for a table after the sorter", + }, []string{"changefeed", "capture", "table"}) ) // InitMetrics registers all metrics used in processor func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(tableResolvedTsGauge) registry.MustRegister(txnCounter) + registry.MustRegister(tableMemoryGauge) } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 9932c21f33a..02eb0bdba54 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -72,9 +72,11 @@ type sinkNode struct { eventBuffer []*model.PolymorphicEvent rowBuffer []*model.RowChangedEvent + + flowController tableFlowController } -func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode { +func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { return &sinkNode{ sink: sink, status: TableStatusInitializing, @@ -82,6 +84,8 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode resolvedTs: startTs, checkpointTs: startTs, barrierTs: startTs, + + flowController: flowController, } } @@ -130,6 +134,8 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err return nil } atomic.StoreUint64(&n.checkpointTs, checkpointTs) + + n.flowController.Release(checkpointTs) return nil } @@ -216,5 +222,6 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error { func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error { n.status.store(TableStatusStopped) + n.flowController.Abort() return n.sink.Close() } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index d45ace78881..34b02440a8d 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -36,6 +36,24 @@ type mockSink struct { } } +// mockFlowController is created because a real tableFlowController cannot be used +// we are testing sinkNode by itself. +type mockFlowController struct{} + +func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { + return nil +} + +func (c *mockFlowController) Release(resolvedTs uint64) { +} + +func (c *mockFlowController) Abort() { +} + +func (c *mockFlowController) GetConsumption() uint64 { + return 0 +} + func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error { return nil } @@ -90,7 +108,7 @@ func (s *outputSuite) TestStatus(c *check.C) { ctx := context.NewContext(stdContext.Background(), &context.Vars{}) // test stop at targetTs - node := newSinkNode(&mockSink{}, 0, 10) + node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -116,7 +134,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(10)) // test the stop at ts command - node = newSinkNode(&mockSink{}, 0, 10) + node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -138,7 +156,7 @@ func (s *outputSuite) TestStatus(c *check.C) { c.Assert(node.CheckpointTs(), check.Equals, uint64(6)) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(&mockSink{}, 0, 10) + node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) @@ -164,7 +182,7 @@ func (s *outputSuite) TestManyTs(c *check.C) { defer testleak.AfterTest(c)() ctx := context.NewContext(stdContext.Background(), &context.Vars{}) sink := &mockSink{} - node := newSinkNode(sink, 0, 10) + node := newSinkNode(sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) c.Assert(node.Status(), check.Equals, TableStatusInitializing) diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 31c48f31f2b..8329a61aa8b 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -16,18 +16,25 @@ package pipeline import ( "context" "os" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" psorter "github.com/pingcap/ticdc/cdc/puller/sorter" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/util" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) +const ( + flushMemoryMetricsDuration = time.Second * 5 +) + type sorterNode struct { sortEngine model.SortEngine sortDir string @@ -37,6 +44,9 @@ type sorterNode struct { tableID model.TableID tableName string // quoted schema and table, used in metircs only + // for per-table flow control + flowController tableFlowController + wg errgroup.Group cancel context.CancelFunc } @@ -45,7 +55,8 @@ func newSorterNode( sortEngine model.SortEngine, sortDir string, changeFeedID model.ChangeFeedID, - tableName string, tableID model.TableID) pipeline.Node { + tableName string, tableID model.TableID, + flowController tableFlowController) pipeline.Node { return &sorterNode{ sortEngine: sortEngine, sortDir: sortDir, @@ -53,6 +64,8 @@ func newSorterNode( changeFeedID: changeFeedID, tableID: tableID, tableName: tableName, + + flowController: flowController, } } @@ -94,13 +107,83 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error { return nil }) n.wg.Go(func() error { + // Since the flowController is implemented by `Cond`, it is not cancelable + // by a context. We need to listen on cancellation and aborts the flowController + // manually. + <-stdCtx.Done() + n.flowController.Abort() + return nil + }) + n.wg.Go(func() error { + lastSentResolvedTs := uint64(0) + lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts. + lastCRTs := uint64(0) // the commit-ts of the last row changed we sent. + + metricsTableMemoryGauge := tableMemoryGauge.WithLabelValues(n.changeFeedID, ctx.Vars().CaptureAddr, n.tableName) + metricsTicker := time.NewTicker(flushMemoryMetricsDuration) + defer metricsTicker.Stop() + for { select { case <-stdCtx.Done(): return nil - case msg := <-sorter.Output(): - if msg == nil { - continue + case <-metricsTicker.C: + metricsTableMemoryGauge.Set(float64(n.flowController.GetConsumption())) + case msg, ok := <-sorter.Output(): + if !ok { + // sorter output channel closed + return nil + } + if msg == nil || msg.RawKV == nil { + log.Panic("unexpected empty msg", zap.Reflect("msg", msg)) + } + if msg.RawKV.OpType != model.OpTypeResolved { + size := uint64(msg.RawKV.ApproximateSize()) + commitTs := msg.CRTs + // We interpolate a resolved-ts if none has been sent for some time. + if time.Since(lastSendResolvedTsTime) > resolvedTsInterpolateInterval { + // checks the condition: cur_event_commit_ts > prev_event_commit_ts > last_resolved_ts + // If this is true, it implies that (1) the last transaction has finished, and we are processing + // the first event in a new transaction, (2) a resolved-ts prev_event_commit_ts is safe to be sent, + // but it has not yet. + // This means that we can interpolate prev_event_commit_ts as a resolved-ts, improving the frequency + // at which the sink flushes. + if lastCRTs > lastSentResolvedTs && commitTs > lastCRTs { + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + ctx.SendToNextNode(pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCRTs))) + } + } + // NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. + // Otherwise the pipeline would deadlock. + err := n.flowController.Consume(commitTs, size, func() error { + if lastCRTs > lastSentResolvedTs { + // If we are blocking, we send a Resolved Event here to elicit a sink-flush. + // Not sending a Resolved Event here will very likely deadlock the pipeline. + lastSentResolvedTs = lastCRTs + lastSendResolvedTsTime = time.Now() + ctx.SendToNextNode(pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCRTs))) + } + return nil + }) + if err != nil { + if cerror.ErrFlowControllerAborted.Equal(err) { + log.Info("flow control cancelled for table", + zap.Int64("tableID", n.tableID), + zap.String("tableName", n.tableName)) + } else { + ctx.Throw(err) + } + return nil + } + lastCRTs = commitTs + } else { + // handle OpTypeResolved + if msg.CRTs < lastSentResolvedTs { + continue + } + lastSentResolvedTs = msg.CRTs + lastSendResolvedTsTime = time.Now() } ctx.SendToNextNode(pipeline.PolymorphicEventMessage(msg)) } @@ -123,6 +206,7 @@ func (n *sorterNode) Receive(ctx pipeline.NodeContext) error { } func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error { + defer tableMemoryGauge.DeleteLabelValues(n.changeFeedID, ctx.Vars().CaptureAddr, n.tableName) n.cancel() return n.wg.Wait() } diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index f218ff65a94..cb47f09d05e 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -17,12 +17,13 @@ import ( stdContext "context" "time" - "github.com/pingcap/ticdc/cdc/sink" - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" + "github.com/pingcap/ticdc/cdc/sink" + "github.com/pingcap/ticdc/cdc/sink/common" + serverConfig "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" @@ -31,6 +32,12 @@ import ( "go.uber.org/zap" ) +const ( + // TODO determine a reasonable default value + // This is part of sink performance optimization + resolvedTsInterpolateInterval = 200 * time.Millisecond +) + // TablePipeline is a pipeline which capture the change log from tikv in a table type TablePipeline interface { // ID returns the ID of source table and mark table @@ -66,6 +73,15 @@ type tablePipelineImpl struct { cancel stdContext.CancelFunc } +// TODO find a better name or avoid using an interface +// We use an interface here for ease in unit testing. +type tableFlowController interface { + Consume(commitTs uint64, size uint64, blockCallBack func() error) error + Release(resolvedTs uint64) + Abort() + GetConsumption() uint64 +} + // ResolvedTs returns the resolved ts in this table pipeline func (t *tablePipelineImpl) ResolvedTs() model.Ts { return t.sinkNode.ResolvedTs() @@ -155,14 +171,21 @@ func NewTablePipeline(ctx context.Context, } ctx, p := pipeline.NewPipeline(ctx, 500*time.Millisecond) + perTableMemoryQuota := serverConfig.GetGlobalServerConfig().PerTableMemoryQuota + log.Debug("creating table flow controller", + zap.String("changefeed-id", changefeedID), + zap.String("table-name", tableName), + zap.Int64("table-id", tableID), + zap.Uint64("quota", perTableMemoryQuota)) + flowController := common.NewTableFlowController(perTableMemoryQuota) p.AppendNode(ctx, "puller", newPullerNode(changefeedID, credential, kvStorage, limitter, tableID, replicaInfo, tableName)) - p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID)) + p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID, flowController)) p.AppendNode(ctx, "mounter", newMounterNode(mounter)) config := ctx.Vars().Config if config.Cyclic != nil && config.Cyclic.IsEnabled() { p.AppendNode(ctx, "cyclic", newCyclicMarkNode(replicaInfo.MarkTableID)) } - tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs) + tablePipeline.sinkNode = newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController) p.AppendNode(ctx, "sink", tablePipeline.sinkNode) tablePipeline.p = p return ctx, tablePipeline diff --git a/cdc/sink/common/flow_control.go b/cdc/sink/common/flow_control.go new file mode 100644 index 00000000000..8f650a4f24f --- /dev/null +++ b/cdc/sink/common/flow_control.go @@ -0,0 +1,226 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "log" + "sync" + "sync/atomic" + + "github.com/edwingeng/deque" + "github.com/pingcap/errors" + cerrors "github.com/pingcap/ticdc/pkg/errors" + "go.uber.org/zap" +) + +// TableMemoryQuota is designed to curb the total memory consumption of processing +// the event streams in a table. +// A higher-level controller more suitable for direct use by the processor is TableFlowController. +type TableMemoryQuota struct { + Quota uint64 // should not be changed once intialized + + IsAborted uint32 + + mu sync.Mutex + Consumed uint64 + + cond *sync.Cond +} + +// NewTableMemoryQuota creates a new TableMemoryQuota +// quota: max advised memory consumption in bytes. +func NewTableMemoryQuota(quota uint64) *TableMemoryQuota { + ret := &TableMemoryQuota{ + Quota: quota, + mu: sync.Mutex{}, + Consumed: 0, + } + + ret.cond = sync.NewCond(&ret.mu) + return ret +} + +// ConsumeWithBlocking is called when a hard-limit is needed. The method will +// block until enough memory has been freed up by Release. +// blockCallBack will be called if the function will block. +// Should be used with care to prevent deadlock. +func (c *TableMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack func() error) error { + if nBytes >= c.Quota { + return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.Quota) + } + + c.mu.Lock() + defer c.mu.Unlock() + + calledBack := false + for { + if atomic.LoadUint32(&c.IsAborted) == 1 { + return cerrors.ErrFlowControllerAborted.GenWithStackByArgs() + } + if c.Consumed+nBytes < c.Quota { + break + } + + if !calledBack { + calledBack = true + err := blockCallBack() + if err != nil { + return errors.Trace(err) + } + } + c.cond.Wait() + } + + c.Consumed += nBytes + return nil +} + +// ForceConsume is called when blocking is not acceptable and the limit can be violated +// for the sake of avoid deadlock. It merely records the increased memory consumption. +func (c *TableMemoryQuota) ForceConsume(nBytes uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + if atomic.LoadUint32(&c.IsAborted) == 1 { + return cerrors.ErrFlowControllerAborted.GenWithStackByArgs() + } + + c.Consumed += nBytes + return nil +} + +// Release is called when a chuck of memory is done being used. +func (c *TableMemoryQuota) Release(nBytes uint64) { + c.mu.Lock() + + if c.Consumed < nBytes { + c.mu.Unlock() + log.Panic("TableMemoryQuota: releasing more than consumed, report a bug", + zap.Uint64("consumed", c.Consumed), + zap.Uint64("released", nBytes)) + } + + c.Consumed -= nBytes + if c.Consumed < c.Quota { + c.mu.Unlock() + c.cond.Signal() + return + } + + c.mu.Unlock() +} + +// Abort interrupts any ongoing ConsumeWithBlocking call +func (c *TableMemoryQuota) Abort() { + atomic.StoreUint32(&c.IsAborted, 1) + c.cond.Signal() +} + +// GetConsumption returns the current memory consumption +func (c *TableMemoryQuota) GetConsumption() uint64 { + c.mu.Lock() + defer c.mu.Unlock() + + return c.Consumed +} + +// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream +type TableFlowController struct { + memoryQuota *TableMemoryQuota + + mu sync.Mutex + queue deque.Deque + + lastCommitTs uint64 +} + +type commitTsSizeEntry struct { + CommitTs uint64 + Size uint64 +} + +// NewTableFlowController creates a new TableFlowController +func NewTableFlowController(quota uint64) *TableFlowController { + return &TableFlowController{ + memoryQuota: NewTableMemoryQuota(quota), + queue: deque.NewDeque(), + } +} + +// Consume is called when an event has arrived for being processed by the sink. +// It will handle transaction boundaries automatically, and will not block intra-transaction. +func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error { + lastCommitTs := atomic.LoadUint64(&c.lastCommitTs) + + if commitTs < lastCommitTs { + log.Panic("commitTs regressed, report a bug", + zap.Uint64("commitTs", commitTs), + zap.Uint64("lastCommitTs", c.lastCommitTs)) + } + + if commitTs > lastCommitTs { + atomic.StoreUint64(&c.lastCommitTs, commitTs) + err := c.memoryQuota.ConsumeWithBlocking(size, blockCallBack) + if err != nil { + return errors.Trace(err) + } + } else { + // Here commitTs == lastCommitTs, which means that we are not crossing + // a transaction boundary. In this situation, we use `ForceConsume` because + // blocking the event stream mid-transaction is highly likely to cause + // a deadlock. + // TODO fix this in the future, after we figure out how to elegantly support large txns. + err := c.memoryQuota.ForceConsume(size) + if err != nil { + return errors.Trace(err) + } + } + + c.mu.Lock() + defer c.mu.Unlock() + c.queue.PushBack(&commitTsSizeEntry{ + CommitTs: commitTs, + Size: size, + }) + + return nil +} + +// Release is called when all events committed before resolvedTs has been freed from memory. +func (c *TableFlowController) Release(resolvedTs uint64) { + var nBytesToRelease uint64 + + c.mu.Lock() + for c.queue.Len() > 0 { + if peeked := c.queue.Front().(*commitTsSizeEntry); peeked.CommitTs <= resolvedTs { + nBytesToRelease += peeked.Size + c.queue.PopFront() + } else { + break + } + } + c.mu.Unlock() + + c.memoryQuota.Release(nBytesToRelease) +} + +// Abort interrupts any ongoing Consume call +func (c *TableFlowController) Abort() { + c.memoryQuota.Abort() +} + +// GetConsumption returns the current memory consumption +func (c *TableFlowController) GetConsumption() uint64 { + return c.memoryQuota.GetConsumption() +} diff --git a/cdc/sink/common/flow_control_test.go b/cdc/sink/common/flow_control_test.go new file mode 100644 index 00000000000..a31fbf3432a --- /dev/null +++ b/cdc/sink/common/flow_control_test.go @@ -0,0 +1,558 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/util/testleak" + "golang.org/x/sync/errgroup" +) + +type flowControlSuite struct{} + +var _ = check.Suite(&flowControlSuite{}) + +func dummyCallBack() error { + return nil +} + +type mockCallBacker struct { + timesCalled int + injectedErr error +} + +func (c *mockCallBacker) cb() error { + c.timesCalled += 1 + return c.injectedErr +} + +func (s *flowControlSuite) TestMemoryQuotaBasic(c *check.C) { + defer testleak.AfterTest(c)() + + controller := NewTableMemoryQuota(1024) + sizeCh := make(chan uint64, 1024) + var ( + wg sync.WaitGroup + consumed uint64 + ) + + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 100000; i++ { + size := (rand.Int() % 128) + 128 + err := controller.ConsumeWithBlocking(uint64(size), dummyCallBack) + c.Assert(err, check.IsNil) + + c.Assert(atomic.AddUint64(&consumed, uint64(size)), check.Less, uint64(1024)) + sizeCh <- uint64(size) + } + + close(sizeCh) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for size := range sizeCh { + c.Assert(atomic.LoadUint64(&consumed), check.GreaterEqual, size) + atomic.AddUint64(&consumed, -size) + controller.Release(size) + } + }() + + wg.Wait() + c.Assert(atomic.LoadUint64(&consumed), check.Equals, uint64(0)) + c.Assert(controller.GetConsumption(), check.Equals, uint64(0)) +} + +func (s *flowControlSuite) TestMemoryQuotaForceConsume(c *check.C) { + defer testleak.AfterTest(c)() + + controller := NewTableMemoryQuota(1024) + sizeCh := make(chan uint64, 1024) + var ( + wg sync.WaitGroup + consumed uint64 + ) + + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 100000; i++ { + size := (rand.Int() % 128) + 128 + + if rand.Int()%3 == 0 { + err := controller.ConsumeWithBlocking(uint64(size), dummyCallBack) + c.Assert(err, check.IsNil) + c.Assert(atomic.AddUint64(&consumed, uint64(size)), check.Less, uint64(1024)) + } else { + err := controller.ForceConsume(uint64(size)) + c.Assert(err, check.IsNil) + atomic.AddUint64(&consumed, uint64(size)) + } + sizeCh <- uint64(size) + } + + close(sizeCh) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + for size := range sizeCh { + c.Assert(atomic.LoadUint64(&consumed), check.GreaterEqual, size) + atomic.AddUint64(&consumed, -size) + controller.Release(size) + } + }() + + wg.Wait() + c.Assert(atomic.LoadUint64(&consumed), check.Equals, uint64(0)) +} + +// TestMemoryQuotaAbort verifies that Abort works +func (s *flowControlSuite) TestMemoryQuotaAbort(c *check.C) { + defer testleak.AfterTest(c)() + + controller := NewTableMemoryQuota(1024) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := controller.ConsumeWithBlocking(700, dummyCallBack) + c.Assert(err, check.IsNil) + + err = controller.ConsumeWithBlocking(700, dummyCallBack) + c.Assert(err, check.ErrorMatches, ".*ErrFlowControllerAborted.*") + + err = controller.ForceConsume(700) + c.Assert(err, check.ErrorMatches, ".*ErrFlowControllerAborted.*") + }() + + time.Sleep(2 * time.Second) + controller.Abort() + + wg.Wait() +} + +// TestMemoryQuotaReleaseZero verifies that releasing 0 bytes is successful +func (s *flowControlSuite) TestMemoryQuotaReleaseZero(c *check.C) { + defer testleak.AfterTest(c)() + + controller := NewTableMemoryQuota(1024) + controller.Release(0) +} + +type mockedEvent struct { + resolvedTs uint64 + size uint64 +} + +func (s *flowControlSuite) TestFlowControlBasic(c *check.C) { + defer testleak.AfterTest(c)() + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + flowController := NewTableFlowController(2048) + + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := 0; i < 100000; i++ { + if rand.Int()%15 == 0 { + lastCommitTs += 10 + } + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &commitTsSizeEntry{ + CommitTs: lastCommitTs, + Size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + resolvedTs := uint64(0) + for { + var mockedRow *commitTsSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.Size) + updatedResolvedTs := false + if resolvedTs != mockedRow.CommitTs { + c.Assert(resolvedTs, check.Less, mockedRow.CommitTs) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: resolvedTs, + }: + } + resolvedTs = mockedRow.CommitTs + updatedResolvedTs = true + } + err := flowController.Consume(mockedRow.CommitTs, mockedRow.Size, dummyCallBack) + c.Check(err, check.IsNil) + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.Size, + }: + } + if updatedResolvedTs { + // new Txn + c.Assert(atomic.LoadUint64(&consumedBytes), check.Less, uint64(2048)) + c.Assert(flowController.GetConsumption(), check.Less, uint64(2048)) + } + } + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: resolvedTs, + }: + } + + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolvedTs) + } + } + + return nil + }) + + c.Assert(errg.Wait(), check.IsNil) + c.Assert(atomic.LoadUint64(&consumedBytes), check.Equals, uint64(0)) +} + +func (s *flowControlSuite) TestFlowControlAbort(c *check.C) { + defer testleak.AfterTest(c)() + + callBacker := &mockCallBacker{} + controller := NewTableFlowController(1024) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + err := controller.Consume(1, 1000, callBacker.cb) + c.Assert(err, check.IsNil) + c.Assert(callBacker.timesCalled, check.Equals, 0) + err = controller.Consume(2, 1000, callBacker.cb) + c.Assert(err, check.ErrorMatches, ".*ErrFlowControllerAborted.*") + c.Assert(callBacker.timesCalled, check.Equals, 1) + err = controller.Consume(2, 10, callBacker.cb) + c.Assert(err, check.ErrorMatches, ".*ErrFlowControllerAborted.*") + c.Assert(callBacker.timesCalled, check.Equals, 1) + }() + + time.Sleep(3 * time.Second) + controller.Abort() + + wg.Wait() +} + +func (s *flowControlSuite) TestFlowControlCallBack(c *check.C) { + defer testleak.AfterTest(c)() + var consumedBytes uint64 + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *commitTsSizeEntry, 1024) + flowController := NewTableFlowController(512) + + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := 0; i < 100000; i++ { + if rand.Int()%15 == 0 { + lastCommitTs += 10 + } + size := uint64(128 + rand.Int()%64) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &commitTsSizeEntry{ + CommitTs: lastCommitTs, + Size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 1024) + errg.Go(func() error { + defer close(eventCh) + lastCRTs := uint64(0) + for { + var mockedRow *commitTsSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + atomic.AddUint64(&consumedBytes, mockedRow.Size) + err := flowController.Consume(mockedRow.CommitTs, mockedRow.Size, func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: lastCRTs, + }: + } + return nil + }) + c.Assert(err, check.IsNil) + lastCRTs = mockedRow.CommitTs + + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.Size, + }: + } + } + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: lastCRTs, + }: + } + + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size != 0 { + atomic.AddUint64(&consumedBytes, -event.size) + } else { + flowController.Release(event.resolvedTs) + } + } + + return nil + }) + + c.Assert(errg.Wait(), check.IsNil) + c.Assert(atomic.LoadUint64(&consumedBytes), check.Equals, uint64(0)) +} + +func (s *flowControlSuite) TestFlowControlCallBackError(c *check.C) { + defer testleak.AfterTest(c)() + + var wg sync.WaitGroup + controller := NewTableFlowController(512) + wg.Add(1) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + go func() { + defer wg.Done() + err := controller.Consume(1, 511, func() error { + c.Fatalf("unreachable") + return nil + }) + c.Assert(err, check.IsNil) + err = controller.Consume(2, 511, func() error { + <-ctx.Done() + return ctx.Err() + }) + c.Assert(err, check.ErrorMatches, ".*context canceled.*") + }() + + time.Sleep(100 * time.Millisecond) + cancel() + + wg.Wait() +} + +func (s *flowControlSuite) TestFlowControlConsumeLargerThanQuota(c *check.C) { + defer testleak.AfterTest(c)() + + controller := NewTableFlowController(1024) + err := controller.Consume(1, 2048, func() error { + c.Fatalf("unreachable") + return nil + }) + c.Assert(err, check.ErrorMatches, ".*ErrFlowControllerEventLargerThanQuota.*") +} + +func BenchmarkTableFlowController(B *testing.B) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) + defer cancel() + errg, ctx := errgroup.WithContext(ctx) + mockedRowsCh := make(chan *commitTsSizeEntry, 102400) + flowController := NewTableFlowController(20 * 1024 * 1024) // 20M + + errg.Go(func() error { + lastCommitTs := uint64(1) + for i := 0; i < B.N; i++ { + if rand.Int()%15 == 0 { + lastCommitTs += 10 + } + size := uint64(1024 + rand.Int()%1024) + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRowsCh <- &commitTsSizeEntry{ + CommitTs: lastCommitTs, + Size: size, + }: + } + } + + close(mockedRowsCh) + return nil + }) + + eventCh := make(chan *mockedEvent, 102400) + errg.Go(func() error { + defer close(eventCh) + resolvedTs := uint64(0) + for { + var mockedRow *commitTsSizeEntry + select { + case <-ctx.Done(): + return ctx.Err() + case mockedRow = <-mockedRowsCh: + } + + if mockedRow == nil { + break + } + + if resolvedTs != mockedRow.CommitTs { + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: resolvedTs, + }: + } + resolvedTs = mockedRow.CommitTs + } + err := flowController.Consume(mockedRow.CommitTs, mockedRow.Size, dummyCallBack) + if err != nil { + B.Fatal(err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + size: mockedRow.Size, + }: + } + } + select { + case <-ctx.Done(): + return ctx.Err() + case eventCh <- &mockedEvent{ + resolvedTs: resolvedTs, + }: + } + + return nil + }) + + errg.Go(func() error { + for { + var event *mockedEvent + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-eventCh: + } + + if event == nil { + break + } + + if event.size == 0 { + flowController.Release(event.resolvedTs) + } + } + + return nil + }) +} diff --git a/cmd/server_test.go b/cmd/server_test.go index 5dca436e276..c79565af172 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -117,6 +117,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { KeyPath: "cc", CertAllowedCN: []string{"dd", "ee"}, }, + PerTableMemoryQuota: 20 * 1024 * 1024, // 20M }) // test decode config file @@ -169,7 +170,8 @@ sort-dir = "/tmp/just_a_test" NumWorkerPoolGoroutine: 5, SortDir: "/tmp/just_a_test", }, - Security: &config.SecurityConfig{}, + Security: &config.SecurityConfig{}, + PerTableMemoryQuota: 20 * 1024 * 1024, // 20M }) configContent = configContent + ` @@ -223,5 +225,6 @@ cert-allowed-cn = ["dd","ee"] KeyPath: "cc", CertAllowedCN: []string{"dd", "ee"}, }, + PerTableMemoryQuota: 20 * 1024 * 1024, // 20M }) } diff --git a/errors.toml b/errors.toml index 337ad0d2a52..32aec31c604 100755 --- a/errors.toml +++ b/errors.toml @@ -246,6 +246,16 @@ error = ''' filter rule is invalid ''' +["CDC:ErrFlowControllerAborted"] +error = ''' +flow controller is aborted +''' + +["CDC:ErrFlowControllerEventLargerThanQuota"] +error = ''' +event is larger than the total memory quota, size: %d, quota: %d +''' + ["CDC:ErrGRPCDialFailed"] error = ''' grpc dial failed diff --git a/pkg/config/config.go b/pkg/config/config.go index cc5249acf63..22b3670890e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -160,7 +160,8 @@ var defaultServerConfig = &ServerConfig{ NumWorkerPoolGoroutine: 16, SortDir: "/tmp/cdc_sort", }, - Security: &SecurityConfig{}, + Security: &SecurityConfig{}, + PerTableMemoryQuota: 20 * 1024 * 1024, // 20MB } // ServerConfig represents a config for server @@ -181,6 +182,8 @@ type ServerConfig struct { Sorter *SorterConfig `toml:"sorter" json:"sorter"` Security *SecurityConfig `toml:"security" json:"security"` + + PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"` } // Marshal returns the json marshal format of a ServerConfig @@ -285,6 +288,13 @@ func (c *ServerConfig) ValidateAndAdjust() error { return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage") } + if c.PerTableMemoryQuota == 0 { + c.PerTableMemoryQuota = defaultServerConfig.PerTableMemoryQuota + } + if c.PerTableMemoryQuota < 6*1024*1024 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs("per-table-memory-quota should be at least 6MB") + } + return nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6f407abe44b..fa2d747b7a8 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { b, err := conf.Marshal() c.Assert(err, check.IsNil) - c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null}}`) + c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) conf2 := new(ServerConfig) - err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null}}`)) + err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } @@ -124,4 +124,7 @@ func (s *serverConfigSuite) TestValidateAndAdjust(c *check.C) { c.Assert(conf.ValidateAndAdjust(), check.ErrorMatches, ".*must be specified.*") conf.AdvertiseAddr = "advertise" c.Assert(conf.ValidateAndAdjust(), check.ErrorMatches, ".*does not contain a port") + conf.AdvertiseAddr = "advertise:1234" + conf.PerTableMemoryQuota = 1 + c.Assert(conf.ValidateAndAdjust(), check.ErrorMatches, ".*should be at least.*") } diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index d527dd15754..5db27545c32 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -208,4 +208,8 @@ var ( // owner related errors ErrOwnerInconsistentStates = errors.Normalize("owner encountered inconsistent state. report a bug if this happens frequently. %s", errors.RFCCodeText("CDC:ErrOwnerInconsistentStates")) + + // miscellaneous internal errors + ErrFlowControllerAborted = errors.Normalize("flow controller is aborted", errors.RFCCodeText("CDC:ErrFlowControllerAborted")) + ErrFlowControllerEventLargerThanQuota = errors.Normalize("event is larger than the total memory quota, size: %d, quota: %d", errors.RFCCodeText("CDC:ErrFlowControllerEventLargerThanQuota")) ) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 13eaa3ec13f..a5130fe58a2 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -27,7 +27,7 @@ import ( // the count of sorted data and unmounted data. In current benchmark a single // processor can reach 50k-100k QPS, and accumulated data is around // 200k-400k in most cases. We need a better chan cache mechanism. -const defaultOutputChannelSize = 1280000 +const defaultOutputChannelSize = 512 // Pipeline represents a pipeline includes a number of nodes type Pipeline struct { From 717b94480c14217ea5b00a80b32a25a00aebad1b Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 26 May 2021 22:45:26 +0800 Subject: [PATCH 2/3] format --- cdc/processor/pipeline/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index a95fa9291e2..8c87203aa6a 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -177,7 +177,7 @@ func NewTablePipeline(ctx context.Context, zap.Int64("table-id", tableID), zap.Uint64("quota", perTableMemoryQuota)) flowController := common.NewTableFlowController(perTableMemoryQuota) - + p := pipeline.NewPipeline(ctx, 500*time.Millisecond) p.AppendNode(ctx, "puller", newPullerNode(changefeedID, credential, kvStorage, limitter, tableID, replicaInfo, tableName)) p.AppendNode(ctx, "sorter", newSorterNode(sortEngine, sortDir, changefeedID, tableName, tableID, flowController)) From 391ebae3cd27f6c46bb18c0d462573fe55d42aa0 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 27 May 2021 10:09:58 +0800 Subject: [PATCH 3/3] fix metrics --- cdc/metrics_processor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/metrics_processor.go b/cdc/metrics_processor.go index 3e209415bb3..3d784f6836f 100644 --- a/cdc/metrics_processor.go +++ b/cdc/metrics_processor.go @@ -102,4 +102,5 @@ func initProcessorMetrics(registry *prometheus.Registry) { registry.MustRegister(txnCounter) registry.MustRegister(updateInfoDuration) registry.MustRegister(processorErrorCounter) + registry.MustRegister(tableMemoryGauge) }