Skip to content

Commit

Permalink
context: uniform the import naming of context, part 1 (pingcap#1773)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and leoppro committed Jun 17, 2021
1 parent 3d58986 commit a8befdd
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
8 changes: 4 additions & 4 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package pipeline

import (
stdContext "context"
"context"
"sort"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -131,8 +131,8 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
}

for _, tc := range testCases {
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx = context.WithChangefeedVars(ctx, &context.ChangefeedVars{
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
Info: &model.ChangeFeedInfo{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Expand Down
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package pipeline

import (
stdContext "context"
"context"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/util"
Expand All @@ -35,7 +35,7 @@ type pullerNode struct {

tableID model.TableID
replicaInfo *model.TableReplicaInfo
cancel stdContext.CancelFunc
cancel context.CancelFunc
wg errgroup.Group
}

Expand All @@ -50,7 +50,7 @@ func newPullerNode(
}
}

func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
// start table puller
config := ctx.ChangefeedVars().Info.Config
spans := make([]regionspan.Span, 0, 4)
Expand All @@ -66,7 +66,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
globalConfig := config.GetGlobalServerConfig()
config := ctx.ChangefeedVars().Info.Config
ctxC, cancel := stdContext.WithCancel(ctx)
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue)
Expand Down
16 changes: 9 additions & 7 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ func (s TableStatus) String() string {
return "Unknown"
}

func (s *TableStatus) load() TableStatus {
// Load TableStatus with THREAD-SAFE
func (s *TableStatus) Load() TableStatus {
return TableStatus(atomic.LoadInt32((*int32)(s)))
}

func (s *TableStatus) store(new TableStatus) {
// Store TableStatus with THREAD-SAFE
func (s *TableStatus) Store(new TableStatus) {
atomic.StoreInt32((*int32)(s), int32(new))
}

Expand Down Expand Up @@ -91,7 +93,7 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowContro

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) Status() TableStatus { return n.status.load() }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
// do nothing
Expand All @@ -101,11 +103,11 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) {
defer func() {
if err != nil {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
return
}
if n.checkpointTs >= n.targetTs {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
err = n.sink.Close()
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -183,7 +185,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if n.status == TableStatusInitializing {
n.status.store(TableStatusRunning)
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -220,7 +222,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close()
}
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package pipeline

import (
stdContext "context"
"context"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -54,11 +54,11 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.RowChangedEvent) error {
func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
resolvedTs model.Ts
Expand All @@ -68,19 +68,19 @@ func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.R
return nil
}

func (s *mockSink) EmitDDLEvent(ctx stdContext.Context, ddl *model.DDLEvent) error {
func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolvedTs})
return resolvedTs, nil
}

func (s *mockSink) EmitCheckpointTs(ctx stdContext.Context, ts uint64) error {
func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
panic("unreachable")
}

Expand All @@ -105,7 +105,7 @@ var _ = check.Suite(&outputSuite{})

func (s *outputSuite) TestStatus(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
Expand Down
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package pipeline

import (
stdContext "context"
"context"
"time"

"github.com/pingcap/log"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,7 +68,7 @@ type tablePipelineImpl struct {
tableName string // quoted schema and table, used in metircs only

sinkNode *sinkNode
cancel stdContext.CancelFunc
cancel context.CancelFunc
}

// TODO find a better name or avoid using an interface
Expand Down Expand Up @@ -146,15 +146,15 @@ func (t *tablePipelineImpl) Wait() {

// NewTablePipeline creates a table pipeline
// TODO(leoppro): implement a mock kvclient to test the table pipeline
func NewTablePipeline(ctx context.Context,
func NewTablePipeline(ctx cdcContext.Context,
limitter *puller.BlurResourceLimitter,
mounter entry.Mounter,
tableID model.TableID,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
targetTs model.Ts) TablePipeline {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := cdcContext.WithCancel(ctx)
tablePipeline := &tablePipelineImpl{
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
Expand Down

0 comments on commit a8befdd

Please sign in to comment.