From 38a2821ea5ff7218bb0cfc4319826a3c0de9fd51 Mon Sep 17 00:00:00 2001
From: leoppro <zhaoyilin@pingcap.com>
Date: Thu, 13 May 2021 17:07:40 +0800
Subject: [PATCH] This is an automated cherry-pick of #1773

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
---
 cdc/processor/pipeline/cyclic_mark_test.go | 16 +++++++--
 cdc/processor/pipeline/puller.go           | 18 ++++++++--
 cdc/processor/pipeline/sink.go             | 19 ++++++----
 cdc/processor/pipeline/sink_test.go        | 42 ++++++++++++++++++----
 cdc/processor/pipeline/table.go            | 20 +++++++++--
 5 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/cdc/processor/pipeline/cyclic_mark_test.go b/cdc/processor/pipeline/cyclic_mark_test.go
index f22b7bddce1..133bd6a449a 100644
--- a/cdc/processor/pipeline/cyclic_mark_test.go
+++ b/cdc/processor/pipeline/cyclic_mark_test.go
@@ -14,7 +14,7 @@
 package pipeline
 
 import (
-	stdContext "context"
+	"context"
 	"sort"
 	"sync"
 
@@ -22,7 +22,7 @@ import (
 	"github.com/pingcap/check"
 	"github.com/pingcap/ticdc/cdc/model"
 	"github.com/pingcap/ticdc/pkg/config"
-	"github.com/pingcap/ticdc/pkg/context"
+	cdcContext "github.com/pingcap/ticdc/pkg/context"
 	"github.com/pingcap/ticdc/pkg/cyclic/mark"
 	"github.com/pingcap/ticdc/pkg/pipeline"
 	"github.com/pingcap/ticdc/pkg/util/testleak"
@@ -131,12 +131,24 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
 	}
 
 	for _, tc := range testCases {
+<<<<<<< HEAD
 		ctx := context.NewContext(stdContext.Background(), &context.Vars{
 			Config: &config.ReplicaConfig{
 				Cyclic: &config.CyclicConfig{
 					Enable:          true,
 					ReplicaID:       tc.replicaID,
 					FilterReplicaID: tc.filterID,
+=======
+		ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
+		ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
+			Info: &model.ChangeFeedInfo{
+				Config: &config.ReplicaConfig{
+					Cyclic: &config.CyclicConfig{
+						Enable:          true,
+						ReplicaID:       tc.replicaID,
+						FilterReplicaID: tc.filterID,
+					},
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 				},
 			},
 		})
diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go
index 6a2a0b6da5a..b43fbb1f0da 100644
--- a/cdc/processor/pipeline/puller.go
+++ b/cdc/processor/pipeline/puller.go
@@ -14,12 +14,17 @@
 package pipeline
 
 import (
-	stdContext "context"
+	"context"
 
 	"github.com/pingcap/errors"
 	"github.com/pingcap/ticdc/cdc/model"
 	"github.com/pingcap/ticdc/cdc/puller"
+<<<<<<< HEAD
 	"github.com/pingcap/ticdc/pkg/context"
+=======
+	"github.com/pingcap/ticdc/pkg/config"
+	cdcContext "github.com/pingcap/ticdc/pkg/context"
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	"github.com/pingcap/ticdc/pkg/pipeline"
 	"github.com/pingcap/ticdc/pkg/regionspan"
 	"github.com/pingcap/ticdc/pkg/security"
@@ -39,7 +44,7 @@ type pullerNode struct {
 
 	tableID     model.TableID
 	replicaInfo *model.TableReplicaInfo
-	cancel      stdContext.CancelFunc
+	cancel      context.CancelFunc
 	wg          errgroup.Group
 }
 
@@ -60,7 +65,7 @@ func newPullerNode(
 	}
 }
 
-func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
+func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
 	// start table puller
 	enableOldValue := ctx.Vars().Config.EnableOldValue
 	spans := make([]regionspan.Span, 0, 4)
@@ -73,9 +78,16 @@ func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
 }
 
 func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
+<<<<<<< HEAD
 	metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(n.changefeedID, ctx.Vars().CaptureAddr, n.tableName)
 	enableOldValue := ctx.Vars().Config.EnableOldValue
 	ctxC, cancel := stdContext.WithCancel(ctx.StdContext())
+=======
+	metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
+	globalConfig := config.GetGlobalServerConfig()
+	config := ctx.ChangefeedVars().Info.Config
+	ctxC, cancel := context.WithCancel(ctx)
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
 	plr := puller.NewPuller(ctxC, ctx.Vars().PDClient, n.credential, n.kvStorage,
 		n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, enableOldValue)
diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go
index 9932c21f33a..79a03596bc5 100644
--- a/cdc/processor/pipeline/sink.go
+++ b/cdc/processor/pipeline/sink.go
@@ -53,11 +53,13 @@ func (s TableStatus) String() string {
 	return "Unknown"
 }
 
-func (s *TableStatus) load() TableStatus {
+// Load TableStatus with THREAD-SAFE
+func (s *TableStatus) Load() TableStatus {
 	return TableStatus(atomic.LoadInt32((*int32)(s)))
 }
 
-func (s *TableStatus) store(new TableStatus) {
+// Store TableStatus with THREAD-SAFE
+func (s *TableStatus) Store(new TableStatus) {
 	atomic.StoreInt32((*int32)(s), int32(new))
 }
 
@@ -87,7 +89,7 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode
 
 func (n *sinkNode) ResolvedTs() model.Ts   { return atomic.LoadUint64(&n.resolvedTs) }
 func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
-func (n *sinkNode) Status() TableStatus    { return n.status.load() }
+func (n *sinkNode) Status() TableStatus    { return n.status.Load() }
 
 func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
 	// do nothing
@@ -97,11 +99,11 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
 func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) {
 	defer func() {
 		if err != nil {
-			n.status.store(TableStatusStopped)
+			n.status.Store(TableStatusStopped)
 			return
 		}
 		if n.checkpointTs >= n.targetTs {
-			n.status.store(TableStatusStopped)
+			n.status.Store(TableStatusStopped)
 			err = n.sink.Close()
 			if err != nil {
 				err = errors.Trace(err)
@@ -178,7 +180,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
 		event := msg.PolymorphicEvent
 		if event.RawKV.OpType == model.OpTypeResolved {
 			if n.status == TableStatusInitializing {
-				n.status.store(TableStatusRunning)
+				n.status.Store(TableStatusRunning)
 			}
 			failpoint.Inject("ProcessorSyncResolvedError", func() {
 				failpoint.Return(errors.New("processor sync resolved injected error"))
@@ -215,6 +217,11 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
 }
 
 func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
+<<<<<<< HEAD
 	n.status.store(TableStatusStopped)
+=======
+	n.status.Store(TableStatusStopped)
+	n.flowController.Abort()
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	return n.sink.Close()
 }
diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go
index d45ace78881..4ce0ff70883 100644
--- a/cdc/processor/pipeline/sink_test.go
+++ b/cdc/processor/pipeline/sink_test.go
@@ -14,12 +14,12 @@
 package pipeline
 
 import (
-	stdContext "context"
+	"context"
 	"testing"
 
 	"github.com/pingcap/check"
 	"github.com/pingcap/ticdc/cdc/model"
-	"github.com/pingcap/ticdc/pkg/context"
+	cdcContext "github.com/pingcap/ticdc/pkg/context"
 	cerrors "github.com/pingcap/ticdc/pkg/errors"
 	"github.com/pingcap/ticdc/pkg/pipeline"
 	"github.com/pingcap/ticdc/pkg/util/testleak"
@@ -36,11 +36,33 @@ type mockSink struct {
 	}
 }
 
+<<<<<<< HEAD
 func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
+=======
+// mockFlowController is created because a real tableFlowController cannot be used
+// we are testing sinkNode by itself.
+type mockFlowController struct{}
+
+func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
+	return nil
+}
+
+func (c *mockFlowController) Release(resolvedTs uint64) {
+}
+
+func (c *mockFlowController) Abort() {
+}
+
+func (c *mockFlowController) GetConsumption() uint64 {
+	return 0
+}
+
+func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	return nil
 }
 
-func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.RowChangedEvent) error {
+func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
 	for _, row := range rows {
 		s.received = append(s.received, struct {
 			resolvedTs model.Ts
@@ -50,11 +72,11 @@ func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.R
 	return nil
 }
 
-func (s *mockSink) EmitDDLEvent(ctx stdContext.Context, ddl *model.DDLEvent) error {
+func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
 	panic("unreachable")
 }
 
-func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint64) (uint64, error) {
+func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
 	s.received = append(s.received, struct {
 		resolvedTs model.Ts
 		row        *model.RowChangedEvent
@@ -62,7 +84,7 @@ func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint
 	return resolvedTs, nil
 }
 
-func (s *mockSink) EmitCheckpointTs(ctx stdContext.Context, ts uint64) error {
+func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
 	panic("unreachable")
 }
 
@@ -87,7 +109,11 @@ var _ = check.Suite(&outputSuite{})
 
 func (s *outputSuite) TestStatus(c *check.C) {
 	defer testleak.AfterTest(c)()
+<<<<<<< HEAD
 	ctx := context.NewContext(stdContext.Background(), &context.Vars{})
+=======
+	ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 
 	// test stop at targetTs
 	node := newSinkNode(&mockSink{}, 0, 10)
@@ -162,7 +188,11 @@ func (s *outputSuite) TestStatus(c *check.C) {
 
 func (s *outputSuite) TestManyTs(c *check.C) {
 	defer testleak.AfterTest(c)()
+<<<<<<< HEAD
 	ctx := context.NewContext(stdContext.Background(), &context.Vars{})
+=======
+	ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	sink := &mockSink{}
 	node := newSinkNode(sink, 0, 10)
 	c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go
index f218ff65a94..e1be75bd596 100644
--- a/cdc/processor/pipeline/table.go
+++ b/cdc/processor/pipeline/table.go
@@ -14,7 +14,7 @@
 package pipeline
 
 import (
-	stdContext "context"
+	"context"
 	"time"
 
 	"github.com/pingcap/ticdc/cdc/sink"
@@ -23,7 +23,14 @@ import (
 	"github.com/pingcap/ticdc/cdc/entry"
 	"github.com/pingcap/ticdc/cdc/model"
 	"github.com/pingcap/ticdc/cdc/puller"
+<<<<<<< HEAD
 	"github.com/pingcap/ticdc/pkg/context"
+=======
+	"github.com/pingcap/ticdc/cdc/sink"
+	"github.com/pingcap/ticdc/cdc/sink/common"
+	serverConfig "github.com/pingcap/ticdc/pkg/config"
+	cdcContext "github.com/pingcap/ticdc/pkg/context"
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	cerror "github.com/pingcap/ticdc/pkg/errors"
 	"github.com/pingcap/ticdc/pkg/pipeline"
 	"github.com/pingcap/ticdc/pkg/security"
@@ -63,7 +70,7 @@ type tablePipelineImpl struct {
 	tableName   string // quoted schema and table, used in metircs only
 
 	sinkNode *sinkNode
-	cancel   stdContext.CancelFunc
+	cancel   context.CancelFunc
 }
 
 // ResolvedTs returns the resolved ts in this table pipeline
@@ -133,10 +140,14 @@ func (t *tablePipelineImpl) Wait() []error {
 // NewTablePipeline creates a table pipeline
 // TODO(leoppro): the parameters in this function are too much, try to move some parameters into ctx.Vars().
 // TODO(leoppro): implement a mock kvclient to test the table pipeline
+<<<<<<< HEAD
 func NewTablePipeline(ctx context.Context,
 	changefeedID model.ChangeFeedID,
 	credential *security.Credential,
 	kvStorage tidbkv.Storage,
+=======
+func NewTablePipeline(ctx cdcContext.Context,
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	limitter *puller.BlurResourceLimitter,
 	mounter entry.Mounter,
 	sortEngine model.SortEngine,
@@ -145,8 +156,13 @@ func NewTablePipeline(ctx context.Context,
 	tableName string,
 	replicaInfo *model.TableReplicaInfo,
 	sink sink.Sink,
+<<<<<<< HEAD
 	targetTs model.Ts) (context.Context, TablePipeline) {
 	ctx, cancel := context.WithCancel(ctx)
+=======
+	targetTs model.Ts) TablePipeline {
+	ctx, cancel := cdcContext.WithCancel(ctx)
+>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
 	tablePipeline := &tablePipelineImpl{
 		tableID:     tableID,
 		markTableID: replicaInfo.MarkTableID,