From 58c6ca1f9cd26102d8e6e709f6f2803cf2488eec Mon Sep 17 00:00:00 2001 From: leoppro Date: Thu, 13 May 2021 17:07:40 +0800 Subject: [PATCH] context: uniform the import naming of context, part 1 (#1773) --- cdc/processor/pipeline/cyclic_mark_test.go | 8 ++++---- cdc/processor/pipeline/puller.go | 10 +++++----- cdc/processor/pipeline/sink.go | 16 +++++++++------- cdc/processor/pipeline/sink_test.go | 18 +++++++++--------- cdc/processor/pipeline/table.go | 10 +++++----- 5 files changed, 32 insertions(+), 30 deletions(-) diff --git a/cdc/processor/pipeline/cyclic_mark_test.go b/cdc/processor/pipeline/cyclic_mark_test.go index 430f5cd2ab8..95e775960c2 100644 --- a/cdc/processor/pipeline/cyclic_mark_test.go +++ b/cdc/processor/pipeline/cyclic_mark_test.go @@ -14,7 +14,7 @@ package pipeline import ( - stdContext "context" + "context" "sort" "sync" @@ -22,7 +22,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/context" + cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/cyclic/mark" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -131,8 +131,8 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) { } for _, tc := range testCases { - ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{}) - ctx = context.WithChangefeedVars(ctx, &context.ChangefeedVars{ + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ Info: &model.ChangeFeedInfo{ Config: &config.ReplicaConfig{ Cyclic: &config.CyclicConfig{ diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index d3891b81fc0..db5d6a9b419 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -14,13 +14,13 @@ package pipeline import ( - stdContext "context" + "context" "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/context" + cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/util" @@ -35,7 +35,7 @@ type pullerNode struct { tableID model.TableID replicaInfo *model.TableReplicaInfo - cancel stdContext.CancelFunc + cancel context.CancelFunc wg errgroup.Group } @@ -50,7 +50,7 @@ func newPullerNode( } } -func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span { +func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span { // start table puller config := ctx.ChangefeedVars().Info.Config spans := make([]regionspan.Span, 0, 4) @@ -66,7 +66,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName) globalConfig := config.GetGlobalServerConfig() config := ctx.ChangefeedVars().Info.Config - ctxC, cancel := stdContext.WithCancel(ctx) + ctxC, cancel := context.WithCancel(ctx) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 122281ef834..fa492118e20 100644 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -53,11 +53,13 @@ func (s TableStatus) String() string { return "Unknown" } -func (s *TableStatus) load() TableStatus { +// Load TableStatus with THREAD-SAFE +func (s *TableStatus) Load() TableStatus { return TableStatus(atomic.LoadInt32((*int32)(s))) } -func (s *TableStatus) store(new TableStatus) { +// Store TableStatus with THREAD-SAFE +func (s *TableStatus) Store(new TableStatus) { atomic.StoreInt32((*int32)(s), int32(new)) } @@ -91,7 +93,7 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowContro func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) } func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) } -func (n *sinkNode) Status() TableStatus { return n.status.load() } +func (n *sinkNode) Status() TableStatus { return n.status.Load() } func (n *sinkNode) Init(ctx pipeline.NodeContext) error { // do nothing @@ -101,11 +103,11 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error { func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) { defer func() { if err != nil { - n.status.store(TableStatusStopped) + n.status.Store(TableStatusStopped) return } if n.checkpointTs >= n.targetTs { - n.status.store(TableStatusStopped) + n.status.Store(TableStatusStopped) err = n.sink.Close() if err != nil { err = errors.Trace(err) @@ -183,7 +185,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error { event := msg.PolymorphicEvent if event.RawKV.OpType == model.OpTypeResolved { if n.status == TableStatusInitializing { - n.status.store(TableStatusRunning) + n.status.Store(TableStatusRunning) } failpoint.Inject("ProcessorSyncResolvedError", func() { failpoint.Return(errors.New("processor sync resolved injected error")) @@ -220,7 +222,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error { } func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error { - n.status.store(TableStatusStopped) + n.status.Store(TableStatusStopped) n.flowController.Abort() return n.sink.Close() } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 4357d05f4b3..910ac91a3c0 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -14,12 +14,12 @@ package pipeline import ( - stdContext "context" + "context" "testing" "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/pkg/context" + cdcContext "github.com/pingcap/ticdc/pkg/context" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -54,11 +54,11 @@ func (c *mockFlowController) GetConsumption() uint64 { return 0 } -func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error { +func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { return nil } -func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.RowChangedEvent) error { +func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { s.received = append(s.received, struct { resolvedTs model.Ts @@ -68,11 +68,11 @@ func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.R return nil } -func (s *mockSink) EmitDDLEvent(ctx stdContext.Context, ddl *model.DDLEvent) error { +func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { panic("unreachable") } -func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint64) (uint64, error) { +func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { s.received = append(s.received, struct { resolvedTs model.Ts row *model.RowChangedEvent @@ -80,7 +80,7 @@ func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint return resolvedTs, nil } -func (s *mockSink) EmitCheckpointTs(ctx stdContext.Context, ts uint64) error { +func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { panic("unreachable") } @@ -105,7 +105,7 @@ var _ = check.Suite(&outputSuite{}) func (s *outputSuite) TestStatus(c *check.C) { defer testleak.AfterTest(c)() - ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{}) + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) // test stop at targetTs node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{}) @@ -180,7 +180,7 @@ func (s *outputSuite) TestStatus(c *check.C) { func (s *outputSuite) TestManyTs(c *check.C) { defer testleak.AfterTest(c)() - ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{}) + ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{}) sink := &mockSink{} node := newSinkNode(sink, 0, 10, &mockFlowController{}) c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index b6be122ae26..b0c53180aed 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -14,7 +14,7 @@ package pipeline import ( - stdContext "context" + "context" "time" "github.com/pingcap/log" @@ -24,7 +24,7 @@ import ( "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/cdc/sink/common" serverConfig "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/context" + cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/pipeline" "go.uber.org/zap" @@ -68,7 +68,7 @@ type tablePipelineImpl struct { tableName string // quoted schema and table, used in metircs only sinkNode *sinkNode - cancel stdContext.CancelFunc + cancel context.CancelFunc } // TODO find a better name or avoid using an interface @@ -146,7 +146,7 @@ func (t *tablePipelineImpl) Wait() { // NewTablePipeline creates a table pipeline // TODO(leoppro): implement a mock kvclient to test the table pipeline -func NewTablePipeline(ctx context.Context, +func NewTablePipeline(ctx cdcContext.Context, limitter *puller.BlurResourceLimitter, mounter entry.Mounter, tableID model.TableID, @@ -154,7 +154,7 @@ func NewTablePipeline(ctx context.Context, replicaInfo *model.TableReplicaInfo, sink sink.Sink, targetTs model.Ts) TablePipeline { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := cdcContext.WithCancel(ctx) tablePipeline := &tablePipelineImpl{ tableID: tableID, markTableID: replicaInfo.MarkTableID,