Skip to content

Commit

Permalink
log-backup: let the advancer listen on the same range with the task (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Sep 16, 2022
1 parent 4db4197 commit ea58f1a
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 31 deletions.
22 changes: 14 additions & 8 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type CheckpointAdvancer struct {

// The concurrency accessed task:
// both by the task listener and ticking.
task *backuppb.StreamBackupTaskInfo
taskMu sync.Mutex
task *backuppb.StreamBackupTaskInfo
taskRange []kv.KeyRange
taskMu sync.Mutex

// the read-only config.
// once tick begin, this should not be changed for now.
Expand Down Expand Up @@ -193,12 +194,15 @@ func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS)
}()
defer utils.PanicToErr(&err)

ranges := CollapseRanges(len(rst.Ranges), func(i int) kv.KeyRange { return rst.Ranges[i] })
ranges := CollapseRanges(len(rst.Ranges), func(i int) kv.KeyRange {
return rst.Ranges[i]
})
workers := utils.NewWorkerPool(4, "sub ranges")
eg, cx := errgroup.WithContext(ctx)
collector := NewClusterCollector(ctx, c.env)
collector.setOnSuccessHook(c.cache.InsertRange)
for _, r := range ranges {
clampedRanges := utils.IntersectAll(ranges, utils.CloneSlice(c.taskRange))
for _, r := range clampedRanges {
r := r
workers.ApplyOnErrorGroup(eg, func() (e error) {
defer c.recordTimeCost("get regions in range", zap.Uint64("checkpoint", rst.TS))()
Expand Down Expand Up @@ -260,9 +264,8 @@ func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context)
// CalculateGlobalCheckpoint calculates the global checkpoint, which won't use the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpoint(ctx context.Context) (uint64, error) {
var (
cp = uint64(math.MaxInt64)
// TODO: Use The task range here.
thisRun []kv.KeyRange = []kv.KeyRange{{}}
cp = uint64(math.MaxInt64)
thisRun []kv.KeyRange = c.taskRange
nextRun []kv.KeyRange
)
defer c.recordTimeCost("record all")
Expand Down Expand Up @@ -411,8 +414,11 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
switch e.Type {
case EventAdd:
c.task = e.Info
c.taskRange = CollapseRanges(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
c.task = nil
c.taskRange = nil
c.state = &fullScan{}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
Expand Down Expand Up @@ -475,7 +481,7 @@ func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error {
return nil
}
defer c.recordTimeCost("consistency check")()
err := c.cache.ConsistencyCheck()
err := c.cache.ConsistencyCheck(c.taskRange)
if err != nil {
log.Error("consistency check failed! log backup may lose data! rolling back to full scan for saving.", logutil.ShortError(err))
c.state = &fullScan{}
Expand Down
34 changes: 23 additions & 11 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/kv"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -36,10 +37,11 @@ func (t EventType) String() string {
}

type TaskEvent struct {
Type EventType
Name string
Info *backuppb.StreamBackupTaskInfo
Err error
Type EventType
Name string
Info *backuppb.StreamBackupTaskInfo
Ranges []kv.KeyRange
Err error
}

func (t *TaskEvent) String() string {
Expand All @@ -60,7 +62,7 @@ func errorEvent(err error) TaskEvent {
}
}

func toTaskEvent(event *clientv3.Event) (TaskEvent, error) {
func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) {
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task path (%s)", string(event.Kv.Key))
}
Expand All @@ -78,13 +80,18 @@ func toTaskEvent(event *clientv3.Event) (TaskEvent, error) {
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
return TaskEvent{}, err
}
var err error
te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx)
if err != nil {
return TaskEvent{}, err
}
return te, nil
}

func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) {
func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) {
result := make([]TaskEvent, 0, len(resp.Events))
for _, event := range resp.Events {
te, err := toTaskEvent(event)
te, err := t.toTaskEvent(ctx, event)
if err != nil {
te.Type = EventErr
te.Err = err
Expand All @@ -97,7 +104,7 @@ func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) {
func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := eventFromWatch(resp)
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
ch <- errorEvent(err)
return false
Expand Down Expand Up @@ -146,10 +153,15 @@ func (t AdvancerExt) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int6
}
events := make([]TaskEvent, 0, len(tasks))
for _, task := range tasks {
ranges, err := task.Ranges(ctx)
if err != nil {
return nil, 0, err
}
te := TaskEvent{
Type: EventAdd,
Name: task.Info.Name,
Info: &(task.Info),
Type: EventAdd,
Name: task.Info.Name,
Info: &(task.Info),
Ranges: ranges,
}
events = append(events, te)
}
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -185,3 +186,43 @@ func TestOneStoreFailure(t *testing.T) {
require.NoError(t, adv.OnTick(ctx))
require.Equal(t, cp, env.checkpoint)
}

func TestTaskRanges(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
defer fmt.Println(c)
ctx := context.Background()
c.splitAndScatter("0001", "0002", "0012", "0034", "0048")
c.advanceCheckpoints()
c.flushAllExcept("0000", "0049")
env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)

shouldFinishInTime(t, 10*time.Second, "first advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
// Don't check the return value of advance checkpoints here -- we didn't
require.Greater(t, env.getCheckpoint(), uint64(0))
}

func TestTaskRangesWithSplit(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
defer fmt.Println(c)
ctx := context.Background()
c.splitAndScatter("0012", "0034", "0048")
c.advanceCheckpoints()
c.flushAllExcept("0049")
env := &testEnv{fakeCluster: c, testCtx: t, ranges: []kv.KeyRange{{StartKey: []byte("0002"), EndKey: []byte("0048")}}}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)

shouldFinishInTime(t, 10*time.Second, "first advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
fstCheckpoint := env.getCheckpoint()
require.Greater(t, fstCheckpoint, uint64(0))

c.splitAndScatter("0002")
c.advanceCheckpoints()
c.flushAllExcept("0000", "0049")
shouldFinishInTime(t, 10*time.Second, "second advancing", func() { require.NoError(t, adv.OnTick(ctx)) })
require.Greater(t, env.getCheckpoint(), fstCheckpoint)
}
34 changes: 33 additions & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -155,6 +158,7 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
},
})
}
log.Debug("Get last flush ts of region", zap.Stringer("in", in), zap.Stringer("out", resp))
return resp, nil
}

Expand Down Expand Up @@ -318,6 +322,7 @@ func (f *fakeCluster) advanceCheckpoints() uint64 {
r.fsim.flushedEpoch = 0
})
}
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
return minCheckpoint
}

Expand Down Expand Up @@ -357,7 +362,14 @@ func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
}

func (r *region) String() string {
return fmt.Sprintf("%d(%d):[%s,%s);%dL%d", r.id, r.epoch, hex.EncodeToString(r.rng.StartKey), hex.EncodeToString(r.rng.EndKey), r.checkpoint, r.leader)
return fmt.Sprintf("%d(%d):[%s,%s);%dL%dF%d",
r.id,
r.epoch,
hex.EncodeToString(r.rng.StartKey),
hex.EncodeToString(r.rng.EndKey),
r.checkpoint,
r.leader,
r.fsim.flushedEpoch)
}

func (f *fakeStore) String() string {
Expand All @@ -375,6 +387,20 @@ func (f *fakeCluster) flushAll() {
}
}

func (f *fakeCluster) flushAllExcept(keys ...string) {
outer:
for _, r := range f.regions {
// Note: can we make it faster?
for _, key := range keys {
if utils.CompareBytesExt(r.rng.StartKey, false, []byte(key), false) <= 0 &&
utils.CompareBytesExt([]byte(key), false, r.rng.EndKey, true) < 0 {
continue outer
}
}
r.flush()
}
}

func (f *fakeStore) flush() {
for _, r := range f.regions {
if r.leader == f.id {
Expand All @@ -400,17 +426,23 @@ type testEnv struct {
*fakeCluster
checkpoint uint64
testCtx *testing.T
ranges []kv.KeyRange

mu sync.Mutex
}

func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error {
rngs := t.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
Ranges: rngs,
}
ch <- tsk
return nil
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,14 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
second := <-ch
require.Equal(t, second.Type, streamhelper.EventDel)
require.Equal(t, second.Name, taskName)
third := <-ch
require.Equal(t, third.Type, streamhelper.EventAdd)
require.Equal(t, third.Name, taskName2)
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
forth := <-ch
require.Equal(t, forth.Type, streamhelper.EventDel)
require.Equal(t, forth.Name, taskName2)
Expand Down
Loading

0 comments on commit ea58f1a

Please sign in to comment.