Skip to content

Commit

Permalink
BR: add stream backup meta client (#30088)
Browse files Browse the repository at this point in the history
* stream: initial commit of stream backup

* br/stream: basic logic of stream backup metadata

* br/stream: add prefix scanner

* br/stream: use types from kv package

* br/stream: unexport prefix scanner(remove it to utils package and export it in need)

* br/stream: move kvproto to br-stream branch, added a playground

* br/stream: added some new functions for playground

* br/stream: added a builder for task

* br/stream: remove playground, use math.MaxUInt64

* br: replace kvproto with br-stream

* br/stream: update error code

* br/stream: fix tests

* br/stream: added copyright
  • Loading branch information
YuJuncen authored Dec 7, 2021
1 parent 0b3ec09 commit bcdee32
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 9 deletions.
3 changes: 3 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ var (
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))

ErrPiTRInvalidCDCLogFormat = errors.Normalize("invalid cdc log format", errors.RFCCodeText("BR:PiTR:ErrPiTRInvalidCDCLogFormat"))
ErrPiTRTaskNotFound = errors.Normalize("task not found", errors.RFCCodeText("BR:PiTR:ErrTaskNotFound"))
ErrPiTRInvalidTaskInfo = errors.Normalize("task info is invalid", errors.RFCCodeText("BR:PiTR:ErrInvalidTaskInfo"))
ErrPiTRMalformedMetadata = errors.Normalize("malformed metadata", errors.RFCCodeText("BR:PiTR:ErrMalformedMetadata"))

ErrStorageUnknown = errors.Normalize("unknown external storage error", errors.RFCCodeText("BR:ExternalStorage:ErrStorageUnknown"))
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))
Expand Down
172 changes: 172 additions & 0 deletions br/pkg/stream/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
package stream

import (
"context"
"encoding/binary"
"math"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"go.etcd.io/etcd/clientv3"
)

// MetaDataClient is the client for operations over metadata.
type MetaDataClient struct {
*clientv3.Client
}

// PutTask put a task to the metadata storage.
func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error {
data, err := task.PBInfo.Marshal()
if err != nil {
return errors.Annotatef(err, "failed to marshal task %s", task.PBInfo.Name)
}

ops := make([]clientv3.Op, 0, 2+len(task.Ranges))
ops = append(ops, clientv3.OpPut(TaskOf(task.PBInfo.Name), string(data)))
for _, r := range task.Ranges {
ops = append(ops, clientv3.OpPut(RangeKeyOf(task.PBInfo.Name, r.StartKey), string(r.EndKey)))
}
if task.Pausing {
ops = append(ops, clientv3.OpPut(Pause(task.PBInfo.Name), ""))
}

txn := c.KV.Txn(ctx)
_, err = txn.Then(ops...).Commit()
if err != nil {
return errors.Annotatef(err, "failed to commit the change for task %s", task.PBInfo.Name)
}
return nil
}

// DeleteTask deletes a task, along with its metadata.
func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error {
_, err := c.KV.Txn(ctx).
Then(clientv3.OpDelete(TaskOf(taskName)),
clientv3.OpDelete(RangesOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(Pause(taskName))).
Commit()
if err != nil {
return errors.Annotatef(err, "failed to delete task itself %s", taskName)
}
return nil
}

func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error {
_, err := c.KV.Put(ctx, Pause(taskName), "")
if err != nil {
return errors.Annotatef(err, "failed to pause task %s", taskName)
}
return nil
}

func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error {
_, err := c.KV.Delete(ctx, Pause(taskName))
if err != nil {
return errors.Annotatef(err, "failed to resume task %s", taskName)
}
return nil
}

// GetTask get the basic task handle from the metadata storage.
func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) {
resp, err := c.Get(ctx, TaskOf(taskName))
if err != nil {
return nil, errors.Annotatef(err, "failed to fetch task %s", taskName)
}
if len(resp.Kvs) == 0 {
return nil, errors.Annotatef(berrors.ErrPiTRTaskNotFound, "no such task %s", taskName)
}
var taskInfo backuppb.StreamBackupTaskInfo
err = proto.Unmarshal(resp.Kvs[0].Value, &taskInfo)
if err != nil {
return nil, errors.Annotatef(err, "invalid binary presentation of task info (name = %s)", taskName)
}
task := &Task{
cli: c,
Info: taskInfo,
}
return task, nil
}

// Task presents a remote "task" object.
// returned by a query of task.
// Associated to the client created it, hence be able to fetch remote fields like `ranges`.
type Task struct {
cli *MetaDataClient
Info backuppb.StreamBackupTaskInfo
}

// Pause is a shorthand for `metaCli.PauseTask`.
func (t *Task) Pause(ctx context.Context) error {
return t.cli.PauseTask(ctx, t.Info.Name)
}

// Resume is a shorthand for `metaCli.ResumeTask`
func (t *Task) Resume(ctx context.Context) error {
return t.cli.ResumeTask(ctx, t.Info.Name)
}

func (t *Task) Paused(ctx context.Context) (bool, error) {
resp, err := t.cli.KV.Get(ctx, Pause(t.Info.Name), clientv3.WithCountOnly())
if err != nil {
return false, errors.Annotatef(err, "failed to fetch the status of task %s", t.Info.Name)
}
return resp.Count > 0, nil
}

// Ranges tries to fetch the range from the metadata storage.
func (t *Task) Ranges(ctx context.Context) (Ranges, error) {
ranges := make(Ranges, 0, 64)
kvs, err := scanEtcdPrefix(t.cli.Client, RangesOf(t.Info.Name)).AllPages(ctx, 64)
if err != nil {
return nil, errors.Annotatef(err, "failed to fetch ranges of task %s", t.Info.Name)
}
commonPrefix := []byte(RangesOf(t.Info.Name))
for _, kvp := range kvs {
// The prefix must matches.
startKey := kvp.Key[len(commonPrefix):]
ranges = append(ranges, kv.KeyRange{StartKey: startKey, EndKey: kvp.Value})
}
return ranges, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
min := uint64(math.MaxUint64)
scanner := scanEtcdPrefix(t.cli.Client, CheckPointsOf(t.Info.Name, store))
kvs, err := scanner.AllPages(ctx, 1024)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}
for _, kv := range kvs {
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)",
store,
len(kv.Value),
redact.Key(kv.Value))
}
nextBackupTS := binary.BigEndian.Uint64(kv.Value)
if nextBackupTS < min {
min = nextBackupTS
}
}
return min, nil
}

// Step forwards the progress (next_backup_ts) of some region.
// The task should be done by TiKV. This function should only be used for test cases.
func (t *Task) Step(ctx context.Context, store uint64, region uint64, ts uint64) error {
_, err := t.cli.KV.Put(ctx, CheckpointOf(t.Info.Name, store, region), string(encodeUint64(ts)))
if err != nil {
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts)
}
return nil
}
Loading

0 comments on commit bcdee32

Please sign in to comment.