Skip to content

Commit

Permalink
Merge branch 'master' into fix-print-status
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Nov 16, 2021
2 parents 89d9388 + 149e5ac commit 1d1b494
Show file tree
Hide file tree
Showing 48 changed files with 1,238 additions and 1,400 deletions.
12 changes: 6 additions & 6 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type sorterNode struct {

mounter entry.Mounter

wg errgroup.Group
eg errgroup.Group
cancel context.CancelFunc

// The latest resolved ts that sorter has received.
Expand Down Expand Up @@ -81,7 +81,7 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
zap.String("changefeed-id", ctx.ChangefeedVars().ID), zap.String("table-name", n.tableName))
}
sortDir := ctx.ChangefeedVars().Info.SortDir
err := unified.UnifiedSorterCheckDir(sortDir)
err := unified.CheckDir(sortDir)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -95,19 +95,19 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
failpoint.Inject("ProcessorAddTableError", func() {
failpoint.Return(errors.New("processor add table injected error"))
})
n.wg.Go(func() error {
n.eg.Go(func() error {
ctx.Throw(errors.Trace(sorter.Run(stdCtx)))
return nil
})
n.wg.Go(func() error {
n.eg.Go(func() error {
// Since the flowController is implemented by `Cond`, it is not cancelable
// by a context. We need to listen on cancellation and aborts the flowController
// manually.
<-stdCtx.Done()
n.flowController.Abort()
return nil
})
n.wg.Go(func() error {
n.eg.Go(func() error {
lastSentResolvedTs := uint64(0)
lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts.
lastCRTs := uint64(0) // the commit-ts of the last row changed we sent.
Expand Down Expand Up @@ -222,7 +222,7 @@ func (n *sorterNode) Receive(ctx pipeline.NodeContext) error {
func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error {
defer tableMemoryHistogram.DeleteLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
n.cancel()
return n.wg.Wait()
return n.eg.Wait()
}

func (n *sorterNode) ResolvedTs() model.Ts {
Expand Down
38 changes: 38 additions & 0 deletions cdc/scheduler/agent_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
)

// ProcessorMessenger implements how messages should be sent to the owner,
// and should be able to know whether there are any messages not yet acknowledged
// by the owner.
type ProcessorMessenger interface {
// FinishTableOperation notifies the owner that a table operation has finished.
FinishTableOperation(ctx context.Context, tableID model.TableID) (bool, error)
// SyncTaskStatuses informs the owner of the processor's current internal state.
SyncTaskStatuses(ctx context.Context, running, adding, removing []model.TableID) (bool, error)
// SendCheckpoint sends the owner the processor's local watermarks, i.e., checkpoint-ts and resolved-ts.
SendCheckpoint(ctx context.Context, checkpointTs model.Ts, resolvedTs model.Ts) (bool, error)

// Barrier returns whether there is a pending message not yet acknowledged by the owner.
Barrier(ctx context.Context) (done bool)
// OnOwnerChanged is called when the owner is changed.
OnOwnerChanged(ctx context.Context, newOwnerCaptureID model.CaptureID)
// Close closes the messenger and does the necessary cleanup.
Close() error
}
53 changes: 53 additions & 0 deletions cdc/scheduler/agent_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/stretchr/testify/mock"
)

type mockProcessorMessenger struct {
mock.Mock
}

func (m *mockProcessorMessenger) FinishTableOperation(ctx cdcContext.Context, tableID model.TableID) (bool, error) {
args := m.Called(ctx, tableID)
return args.Bool(0), args.Error(1)
}

func (m *mockProcessorMessenger) SyncTaskStatuses(ctx cdcContext.Context, running, adding, removing []model.TableID) (bool, error) {
args := m.Called(ctx, running, adding, removing)
return args.Bool(0), args.Error(1)
}

func (m *mockProcessorMessenger) SendCheckpoint(ctx cdcContext.Context, checkpointTs model.Ts, resolvedTs model.Ts) (bool, error) {
args := m.Called(ctx, checkpointTs, resolvedTs)
return args.Bool(0), args.Error(1)
}

func (m *mockProcessorMessenger) Barrier(ctx cdcContext.Context) (done bool) {
args := m.Called(ctx)
return args.Bool(0)
}

func (m *mockProcessorMessenger) OnOwnerChanged(ctx cdcContext.Context, newOwnerCaptureID model.CaptureID) {
m.Called(ctx, newOwnerCaptureID)
}

func (m *mockProcessorMessenger) Close() error {
args := m.Called()
return args.Error(0)
}
115 changes: 115 additions & 0 deletions cdc/scheduler/checkpoint_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
"go.uber.org/zap"
)

type checkpointProviderFunc = func() (checkpointTs, resolvedTs model.Ts, ok bool)

// checkpointSender is used to send checkpoints (as well as other watermarks)
// to the owner's scheduler.
// It DOES NOT need to be thread-safe.
type checkpointSender interface {
// SendCheckpoint sends a checkpoint to the owner's scheduler.
// provider is called to get the latest checkpoint when needed.
SendCheckpoint(ctx context.Context, provider checkpointProviderFunc) error
// LastSentCheckpointTs returns the last checkpoint sent to the owner's scheduler.
LastSentCheckpointTs() model.Ts
}

const (
noEnRouteCheckpointTs = 0
)

type checkpointTsSenderImpl struct {
communicator ProcessorMessenger
logger *zap.Logger

// We use a `clock.Clock` here to make time mockable in unit tests.
clock clock.Clock

// The wall-clock time when we sent the last checkpoint-ts.
lastSendCheckpointTime time.Time

// The last checkpoint-ts guaranteed to have been received.
lastSentCheckpoint model.Ts
// The last checkpoint-ts already sent
// but not yet guaranteed to have been received.
enRouteCheckpointTs model.Ts

// config parameter. Read only.
sendCheckpointTsInterval time.Duration
}

func newCheckpointSender(
communicator ProcessorMessenger,
logger *zap.Logger,
sendCheckpointTsInterval time.Duration,
) checkpointSender {
return &checkpointTsSenderImpl{
communicator: communicator,
logger: logger,
clock: clock.New(),
sendCheckpointTsInterval: sendCheckpointTsInterval,
}
}

// SendCheckpoint sends a checkpoint to the owner's scheduler.
func (s *checkpointTsSenderImpl) SendCheckpoint(ctx context.Context, provider checkpointProviderFunc) error {
// First check if there is a checkpoint en route to the Owner.
if s.enRouteCheckpointTs != noEnRouteCheckpointTs {
if !s.communicator.Barrier(ctx) {
s.logger.Debug("not sending checkpoint due to pending barrier",
zap.Duration("since-last-sent", time.Since(s.lastSendCheckpointTime)))
// We cannot proceed because the last checkpoint has not been acknowledged to have
// been received.
return nil
}
// The last checkpoint HAS been acknowledged.
s.lastSentCheckpoint, s.enRouteCheckpointTs = s.enRouteCheckpointTs, noEnRouteCheckpointTs
}

checkpointTs, resolvedTs, ok := provider()
if !ok {
s.logger.Debug("no checkpoint to send")
return nil
}

if s.clock.Since(s.lastSendCheckpointTime) < s.sendCheckpointTsInterval {
return nil
}

done, err := s.communicator.SendCheckpoint(ctx, checkpointTs, resolvedTs)
if err != nil {
return errors.Trace(err)
}
if done {
s.enRouteCheckpointTs = checkpointTs
s.lastSendCheckpointTime = s.clock.Now()
}
return nil
}

// LastSentCheckpointTs returns the last checkpoint sent to the owner's scheduler.
func (s *checkpointTsSenderImpl) LastSentCheckpointTs() model.Ts {
return s.lastSentCheckpoint
}
108 changes: 108 additions & 0 deletions cdc/scheduler/checkpoint_sender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/context"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

const (
defaultCheckpointIntervalForTesting = time.Second * 1
)

func TestCheckpointTsSenderBasics(t *testing.T) {
ctx := context.NewBackendContext4Test(false)
mockCommunicator := &mockProcessorMessenger{}
sender := newCheckpointSender(mockCommunicator, zap.L(), defaultCheckpointIntervalForTesting)
mockClock := clock.NewMock()
sender.(*checkpointTsSenderImpl).clock = mockClock

startTime := time.Now()
mockClock.Set(startTime)

// Test 1: SendCheckpoint returns false (message client could be congested)
mockCommunicator.On("SendCheckpoint", mock.Anything, model.Ts(1000), model.Ts(1100)).
Return(false, nil)
err := sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 1000, 1100, true
})
require.NoError(t, err)
require.Equal(t, model.Ts(0), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)

// Test 2: SendCheckpoint returns true (message sent successfully)
mockCommunicator.Calls = nil
mockCommunicator.ExpectedCalls = nil
mockCommunicator.On("SendCheckpoint", mock.Anything, model.Ts(1100), model.Ts(1200)).
Return(true, nil)
err = sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 1100, 1200, true
})
require.NoError(t, err)
require.Equal(t, model.Ts(0), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)

// Test 3: Barrier returns false (message still en route)
mockCommunicator.Calls = nil
mockCommunicator.ExpectedCalls = nil
mockCommunicator.On("Barrier", mock.Anything).Return(false, nil)
err = sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 0, 0, false // this false here should not have mattered
})
require.NoError(t, err)
require.Equal(t, model.Ts(0), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)

// Test 4: Barrier returns true (message sent successfully)
mockCommunicator.Calls = nil
mockCommunicator.ExpectedCalls = nil
mockCommunicator.On("Barrier", mock.Anything).Return(true, nil)
err = sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 0, 0, false // this false here should not have mattered
})
require.NoError(t, err)
require.Equal(t, model.Ts(1100), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)

// Test 5: No checkpoint is sent before the interval has elapsed.
mockCommunicator.Calls = nil
mockCommunicator.ExpectedCalls = nil
err = sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 2000, 2100, true
})
require.NoError(t, err)
require.Equal(t, model.Ts(1100), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)

// Test 6: A new checkpoint is sent after the interval has elapsed.
mockCommunicator.Calls = nil
mockCommunicator.ExpectedCalls = nil
mockClock.Add(2 * defaultCheckpointIntervalForTesting)
mockCommunicator.On("SendCheckpoint", mock.Anything, model.Ts(2200), model.Ts(2300)).
Return(true, nil)
err = sender.SendCheckpoint(ctx, func() (checkpointTs, resolvedTs model.Ts, ok bool) {
return 2200, 2300, true
})
require.NoError(t, err)
require.Equal(t, model.Ts(1100), sender.LastSentCheckpointTs())
mockCommunicator.AssertExpectations(t)
}
3 changes: 2 additions & 1 deletion cdc/sorter/leveldb/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ TASKS:
// After the delay, this batch can be write forcibly.
reschedulePos = pos
rescheduleDelay = delay
iter.Release()
break TASKS
}
batch.Reset()
force = false
}
}
Expand Down Expand Up @@ -176,6 +176,7 @@ func (clean *CleanerActor) writeRateLimited(
if err != nil {
return 0, errors.Trace(err)
}
batch.Reset()
return 0, nil
}

Expand Down
Loading

0 comments on commit 1d1b494

Please sign in to comment.