Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context: uniform the import naming of context, part 1 #1773

Merged
merged 1 commit into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package pipeline

import (
stdContext "context"
"context"
"sort"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -131,8 +131,8 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
}

for _, tc := range testCases {
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx = context.WithChangefeedVars(ctx, &context.ChangefeedVars{
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
Info: &model.ChangeFeedInfo{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Expand Down
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
package pipeline

import (
stdContext "context"
"context"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/util"
Expand All @@ -35,7 +35,7 @@ type pullerNode struct {

tableID model.TableID
replicaInfo *model.TableReplicaInfo
cancel stdContext.CancelFunc
cancel context.CancelFunc
wg errgroup.Group
}

Expand All @@ -50,7 +50,7 @@ func newPullerNode(
}
}

func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
// start table puller
config := ctx.ChangefeedVars().Info.Config
spans := make([]regionspan.Span, 0, 4)
Expand All @@ -66,7 +66,7 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
globalConfig := config.GetGlobalServerConfig()
config := ctx.ChangefeedVars().Info.Config
ctxC, cancel := stdContext.WithCancel(ctx)
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue)
Expand Down
16 changes: 9 additions & 7 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ func (s TableStatus) String() string {
return "Unknown"
}

func (s *TableStatus) load() TableStatus {
// Load TableStatus with THREAD-SAFE
func (s *TableStatus) Load() TableStatus {
return TableStatus(atomic.LoadInt32((*int32)(s)))
}

func (s *TableStatus) store(new TableStatus) {
// Store TableStatus with THREAD-SAFE
func (s *TableStatus) Store(new TableStatus) {
atomic.StoreInt32((*int32)(s), int32(new))
}
Comment on lines +56 to 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Load and Store be used by other packages?

Copy link
Contributor Author

@zier-one zier-one May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the new owner, for test only


Expand Down Expand Up @@ -91,7 +93,7 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowContro

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) Status() TableStatus { return n.status.load() }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
// do nothing
Expand All @@ -101,11 +103,11 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) {
defer func() {
if err != nil {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
return
}
if n.checkpointTs >= n.targetTs {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
err = n.sink.Close()
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -183,7 +185,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if n.status == TableStatusInitializing {
n.status.store(TableStatusRunning)
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -220,7 +222,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
n.flowController.Abort()
return n.sink.Close()
}
18 changes: 9 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package pipeline

import (
stdContext "context"
"context"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -54,11 +54,11 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.RowChangedEvent) error {
func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
resolvedTs model.Ts
Expand All @@ -68,19 +68,19 @@ func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.R
return nil
}

func (s *mockSink) EmitDDLEvent(ctx stdContext.Context, ddl *model.DDLEvent) error {
func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolvedTs})
return resolvedTs, nil
}

func (s *mockSink) EmitCheckpointTs(ctx stdContext.Context, ts uint64) error {
func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
panic("unreachable")
}

Expand All @@ -105,7 +105,7 @@ var _ = check.Suite(&outputSuite{})

func (s *outputSuite) TestStatus(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *outputSuite) TestStatus(c *check.C) {

func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
ctx := context.NewContext(stdContext.Background(), &context.GlobalVars{})
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
Expand Down
10 changes: 5 additions & 5 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package pipeline

import (
stdContext "context"
"context"
"time"

"github.com/pingcap/log"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,7 +68,7 @@ type tablePipelineImpl struct {
tableName string // quoted schema and table, used in metircs only

sinkNode *sinkNode
cancel stdContext.CancelFunc
cancel context.CancelFunc
}

// TODO find a better name or avoid using an interface
Expand Down Expand Up @@ -146,15 +146,15 @@ func (t *tablePipelineImpl) Wait() {

// NewTablePipeline creates a table pipeline
// TODO(leoppro): implement a mock kvclient to test the table pipeline
func NewTablePipeline(ctx context.Context,
func NewTablePipeline(ctx cdcContext.Context,
limitter *puller.BlurResourceLimitter,
mounter entry.Mounter,
tableID model.TableID,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
targetTs model.Ts) TablePipeline {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := cdcContext.WithCancel(ctx)
tablePipeline := &tablePipelineImpl{
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
Expand Down