Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context: uniform the import naming of context, part 1 (#1773) #1775

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions cdc/processor/pipeline/cyclic_mark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
package pipeline

import (
stdContext "context"
"context"
"sort"
"sync"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -131,12 +131,24 @@ func (s *markSuite) TestCyclicMarkNode(c *check.C) {
}

for _, tc := range testCases {
<<<<<<< HEAD
ctx := context.NewContext(stdContext.Background(), &context.Vars{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Enable: true,
ReplicaID: tc.replicaID,
FilterReplicaID: tc.filterID,
=======
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
Info: &model.ChangeFeedInfo{
Config: &config.ReplicaConfig{
Cyclic: &config.CyclicConfig{
Enable: true,
ReplicaID: tc.replicaID,
FilterReplicaID: tc.filterID,
},
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
},
},
})
Expand Down
18 changes: 15 additions & 3 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@
package pipeline

import (
stdContext "context"
"context"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
<<<<<<< HEAD
"github.com/pingcap/ticdc/pkg/context"
=======
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/security"
Expand All @@ -39,7 +44,7 @@ type pullerNode struct {

tableID model.TableID
replicaInfo *model.TableReplicaInfo
cancel stdContext.CancelFunc
cancel context.CancelFunc
wg errgroup.Group
}

Expand All @@ -60,7 +65,7 @@ func newPullerNode(
}
}

func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
// start table puller
enableOldValue := ctx.Vars().Config.EnableOldValue
spans := make([]regionspan.Span, 0, 4)
Expand All @@ -73,9 +78,16 @@ func (n *pullerNode) tableSpan(ctx context.Context) []regionspan.Span {
}

func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
<<<<<<< HEAD
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(n.changefeedID, ctx.Vars().CaptureAddr, n.tableName)
enableOldValue := ctx.Vars().Config.EnableOldValue
ctxC, cancel := stdContext.WithCancel(ctx.StdContext())
=======
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
globalConfig := config.GetGlobalServerConfig()
config := ctx.ChangefeedVars().Info.Config
ctxC, cancel := context.WithCancel(ctx)
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
plr := puller.NewPuller(ctxC, ctx.Vars().PDClient, n.credential, n.kvStorage,
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, enableOldValue)
Expand Down
19 changes: 13 additions & 6 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ func (s TableStatus) String() string {
return "Unknown"
}

func (s *TableStatus) load() TableStatus {
// Load TableStatus with THREAD-SAFE
func (s *TableStatus) Load() TableStatus {
return TableStatus(atomic.LoadInt32((*int32)(s)))
}

func (s *TableStatus) store(new TableStatus) {
// Store TableStatus with THREAD-SAFE
func (s *TableStatus) Store(new TableStatus) {
atomic.StoreInt32((*int32)(s), int32(new))
}

Expand Down Expand Up @@ -87,7 +89,7 @@ func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts) *sinkNode

func (n *sinkNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) }
func (n *sinkNode) CheckpointTs() model.Ts { return atomic.LoadUint64(&n.checkpointTs) }
func (n *sinkNode) Status() TableStatus { return n.status.load() }
func (n *sinkNode) Status() TableStatus { return n.status.Load() }

func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
// do nothing
Expand All @@ -97,11 +99,11 @@ func (n *sinkNode) Init(ctx pipeline.NodeContext) error {
func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err error) {
defer func() {
if err != nil {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
return
}
if n.checkpointTs >= n.targetTs {
n.status.store(TableStatusStopped)
n.status.Store(TableStatusStopped)
err = n.sink.Close()
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -178,7 +180,7 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
event := msg.PolymorphicEvent
if event.RawKV.OpType == model.OpTypeResolved {
if n.status == TableStatusInitializing {
n.status.store(TableStatusRunning)
n.status.Store(TableStatusRunning)
}
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(errors.New("processor sync resolved injected error"))
Expand Down Expand Up @@ -215,6 +217,11 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) Destroy(ctx pipeline.NodeContext) error {
<<<<<<< HEAD
n.status.store(TableStatusStopped)
=======
n.status.Store(TableStatusStopped)
n.flowController.Abort()
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
return n.sink.Close()
}
42 changes: 36 additions & 6 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package pipeline

import (
stdContext "context"
"context"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand All @@ -36,11 +36,33 @@ type mockSink struct {
}
}

<<<<<<< HEAD
func (s *mockSink) Initialize(ctx stdContext.Context, tableInfo []*model.SimpleTableInfo) error {
=======
// mockFlowController is created because a real tableFlowController cannot be used
// we are testing sinkNode by itself.
type mockFlowController struct{}

func (c *mockFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
return nil
}

func (c *mockFlowController) Release(resolvedTs uint64) {
}

func (c *mockFlowController) Abort() {
}

func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.RowChangedEvent) error {
func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
resolvedTs model.Ts
Expand All @@ -50,19 +72,19 @@ func (s *mockSink) EmitRowChangedEvents(ctx stdContext.Context, rows ...*model.R
return nil
}

func (s *mockSink) EmitDDLEvent(ctx stdContext.Context, ddl *model.DDLEvent) error {
func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx stdContext.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
}{resolvedTs: resolvedTs})
return resolvedTs, nil
}

func (s *mockSink) EmitCheckpointTs(ctx stdContext.Context, ts uint64) error {
func (s *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
panic("unreachable")
}

Expand All @@ -87,7 +109,11 @@ var _ = check.Suite(&outputSuite{})

func (s *outputSuite) TestStatus(c *check.C) {
defer testleak.AfterTest(c)()
<<<<<<< HEAD
ctx := context.NewContext(stdContext.Background(), &context.Vars{})
=======
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10)
Expand Down Expand Up @@ -162,7 +188,11 @@ func (s *outputSuite) TestStatus(c *check.C) {

func (s *outputSuite) TestManyTs(c *check.C) {
defer testleak.AfterTest(c)()
<<<<<<< HEAD
ctx := context.NewContext(stdContext.Background(), &context.Vars{})
=======
ctx := cdcContext.NewContext(context.Background(), &cdcContext.GlobalVars{})
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
sink := &mockSink{}
node := newSinkNode(sink, 0, 10)
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, nil, nil)), check.IsNil)
Expand Down
20 changes: 18 additions & 2 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package pipeline

import (
stdContext "context"
"context"
"time"

"github.com/pingcap/ticdc/cdc/sink"
Expand All @@ -23,7 +23,14 @@ import (
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
<<<<<<< HEAD
"github.com/pingcap/ticdc/pkg/context"
=======
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"github.com/pingcap/ticdc/pkg/security"
Expand Down Expand Up @@ -63,7 +70,7 @@ type tablePipelineImpl struct {
tableName string // quoted schema and table, used in metircs only

sinkNode *sinkNode
cancel stdContext.CancelFunc
cancel context.CancelFunc
}

// ResolvedTs returns the resolved ts in this table pipeline
Expand Down Expand Up @@ -133,10 +140,14 @@ func (t *tablePipelineImpl) Wait() []error {
// NewTablePipeline creates a table pipeline
// TODO(leoppro): the parameters in this function are too much, try to move some parameters into ctx.Vars().
// TODO(leoppro): implement a mock kvclient to test the table pipeline
<<<<<<< HEAD
func NewTablePipeline(ctx context.Context,
changefeedID model.ChangeFeedID,
credential *security.Credential,
kvStorage tidbkv.Storage,
=======
func NewTablePipeline(ctx cdcContext.Context,
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
limitter *puller.BlurResourceLimitter,
mounter entry.Mounter,
sortEngine model.SortEngine,
Expand All @@ -145,8 +156,13 @@ func NewTablePipeline(ctx context.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
<<<<<<< HEAD
targetTs model.Ts) (context.Context, TablePipeline) {
ctx, cancel := context.WithCancel(ctx)
=======
targetTs model.Ts) TablePipeline {
ctx, cancel := cdcContext.WithCancel(ctx)
>>>>>>> 58c6ca1f (context: uniform the import naming of context, part 1 (#1773))
tablePipeline := &tablePipelineImpl{
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
Expand Down