Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: add stream backup meta client #30088

Merged
merged 14 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))

ErrPiTRInvalidCDCLogFormat = errors.Normalize("invalid cdc log format", errors.RFCCodeText("BR:PiTR:ErrPiTRInvalidCDCLogFormat"))
ErrPiTRTaskNotFound = errors.Normalize("task not found", errors.RFCCodeText("BR:PiTR:ErrTaskNotFound"))
ErrPiTRMalformedMetadata = errors.Normalize("malformed metadata", errors.RFCCodeText("BR:PiTR:ErrMalformedMetadata"))

ErrStorageUnknown = errors.Normalize("unknown external storage error", errors.RFCCodeText("BR:ExternalStorage:ErrStorageUnknown"))
ErrStorageInvalidConfig = errors.Normalize("invalid external storage config", errors.RFCCodeText("BR:ExternalStorage:ErrStorageInvalidConfig"))
Expand Down
174 changes: 174 additions & 0 deletions br/pkg/stream/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the license


import (
"bytes"
"context"
"encoding/binary"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"go.etcd.io/etcd/clientv3"
)

// MetaDataClient is the client for operations over metadata.
type MetaDataClient struct {
*clientv3.Client
}

// PutTask put a task to the metadata storage.
func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error {
data, err := task.StreamBackupTaskInfo.Marshal()
if err != nil {
return errors.Annotatef(err, "failed to marshal task %s", task.Name)
}

ops := make([]clientv3.Op, 0, 2+len(task.Ranges))
ops = append(ops, clientv3.OpPut(TaskOf(task.GetName()), string(data)))
for _, r := range task.Ranges {
ops = append(ops, clientv3.OpPut(RangeKeyOf(task.Name, r.StartKey), string(r.EndKey)))
}
if task.Pausing {
ops = append(ops, clientv3.OpPut(Pause(task.Name), ""))
}

txn := c.KV.Txn(ctx)
_, err = txn.Then(ops...).Commit()
if err != nil {
return errors.Annotatef(err, "failed to commit the change for task %s", task.Name)
}
return nil
}

// DeleteTask deletes a task, along with its metadata.
func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error {
_, err := c.KV.Txn(ctx).
Then(clientv3.OpDelete(TaskOf(taskName)),
clientv3.OpDelete(RangesOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(Pause(taskName))).
Commit()
if err != nil {
return errors.Annotatef(err, "failed to delete task itself %s", taskName)
}
return nil
}

func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how we control TiKV pause observe task.🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to kenny, pausing / resuming is out of our current scope. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to clarify, it is out of scope because we aren't sure if we want the non-destructive pause/resume or the destructive stop/start, so we delay the feature as it is not essential anyway.

_, err := c.KV.Put(ctx, Pause(taskName), "")
if err != nil {
return errors.Annotatef(err, "failed to pause task %s", taskName)
}
return nil
}

func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error {
_, err := c.KV.Delete(ctx, Pause(taskName))
if err != nil {
return errors.Annotatef(err, "failed to resume task %s", taskName)
}
return nil
}

// GetTask get the basic task handle from the metadata storage.
func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) {
resp, err := c.Get(ctx, TaskOf(taskName))
if err != nil {
return nil, errors.Annotatef(err, "failed to fetch task %s", taskName)
}
if len(resp.Kvs) == 0 {
return nil, errors.Annotatef(berrors.ErrPiTRTaskNotFound, "no such task %s", taskName)
}
var taskInfo backuppb.StreamBackupTaskInfo
err = proto.Unmarshal(resp.Kvs[0].Value, &taskInfo)
if err != nil {
return nil, errors.Annotatef(err, "invalid binary presentation of task info (name = %s)", taskName)
}
task := &Task{
cli: c,
Info: taskInfo,
}
return task, nil
}

// Task presents a remote "task" object.
// returned by a query of task.
// Associated to the client created it, hence be able to fetch remote fields like `ranges`.
type Task struct {
cli *MetaDataClient
Info backuppb.StreamBackupTaskInfo
}

// Pause is a shorthand for `metaCli.PauseTask`.
func (t *Task) Pause(ctx context.Context) error {
return t.cli.PauseTask(ctx, t.Info.Name)
}

// Resume is a shorthand for `metaCli.ResumeTask`
func (t *Task) Resume(ctx context.Context) error {
return t.cli.ResumeTask(ctx, t.Info.Name)
}

func (t *Task) Paused(ctx context.Context) (bool, error) {
resp, err := t.cli.KV.Get(ctx, Pause(t.Info.Name), clientv3.WithCountOnly())
if err != nil {
return false, errors.Annotatef(err, "failed to fetch the status of task %s", t.Info.Name)
}
return resp.Count > 0, nil
}

// Ranges tries to fetch the range from the metadata storage.
func (t *Task) Ranges(ctx context.Context) (Ranges, error) {
ranges := make(Ranges, 0, 64)
kvs, err := scanEtcdPrefix(t.cli.Client, RangesOf(t.Info.Name)).AllPages(ctx, 64)
if err != nil {
return nil, errors.Annotatef(err, "failed to fetch ranges of task %s", t.Info.Name)
}
commonPrefix := []byte(RangesOf(t.Info.Name))
for _, kvp := range kvs {
// Given we scan the key `RangesOf(t.Info.Name)` with `WithPrefix()`,
// The prefix should always be RangesOf(t.Info.Name).
// It would be safe to cut the prefix directly. (instead of use TrimPrefix)
// But the rule not apply for the slash. Maybe scan the prefix RangesOf(t.Info.Name) + "/"?
startKey := bytes.TrimPrefix(kvp.Key[len(commonPrefix):], []byte("/"))
ranges = append(ranges, kv.KeyRange{StartKey: startKey, EndKey: kvp.Value})
}
return ranges, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
min := uint64(0xffffffff)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 0xFFFFFFFFFFFFFFFF ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found math.MaxUint64 🦉

scanner := scanEtcdPrefix(t.cli.Client, CheckPointsOf(t.Info.Name, store))
kvs, err := scanner.AllPages(ctx, 1024)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}
for _, kv := range kvs {
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)",
store,
len(kv.Value),
redact.Key(kv.Value))
}
nextBackupTS := binary.BigEndian.Uint64(kv.Value)
if nextBackupTS < min {
min = nextBackupTS
}
}
return min, nil
}

// Step forwards the progress (next_backup_ts) of some region.
// The task should be done by TiKV. This function should only be used for test cases.
func (t *Task) Step(ctx context.Context, store uint64, region uint64, ts uint64) error {
_, err := t.cli.KV.Put(ctx, CheckpointOf(t.Info.Name, store, region), string(encodeUint64(ts)))
if err != nil {
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts)
}
return nil
}
199 changes: 199 additions & 0 deletions br/pkg/stream/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// This package tests the login in MetaClient with a embed etcd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


package stream_test

import (
"context"
"fmt"
"net"
"net/url"
"testing"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/tablecodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/kv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc"
)

func getRandomLocalAddr() url.URL {
listen, err := net.Listen("tcp", "127.0.0.1:")
defer func() {
if err := listen.Close(); err != nil {
log.Panic("failed to release temporary port", logutil.ShortError(err))
}
}()
if err != nil {
log.Panic("failed to listen random port", logutil.ShortError(err))
}
u, err := url.Parse(fmt.Sprintf("http://%s", listen.Addr().String()))
if err != nil {
log.Panic("failed to parse url", logutil.ShortError(err))
}
return *u
}

func runEtcd(t *testing.T) (*embed.Etcd, *clientv3.Client) {
cfg := embed.NewConfig()
cfg.Dir = t.TempDir()
clientURL := getRandomLocalAddr()
cfg.LCUrls = []url.URL{clientURL}
cfg.LPUrls = []url.URL{getRandomLocalAddr()}
cfg.LogLevel = "fatal"
etcd, err := embed.StartEtcd(cfg)
if err != nil {
log.Panic("failed to start etcd server", logutil.ShortError(err))
}
<-etcd.Server.ReadyNotify()
cliCfg := clientv3.Config{
Endpoints: []string{clientURL.String()},
}
cli, err := clientv3.New(cliCfg)
if err != nil {
log.Panic("failed to connect to etcd server", logutil.ShortError(err))
}
return etcd, cli
}

func simpleRanges(tableCount int) stream.Ranges {
ranges := stream.Ranges{}
for i := 0; i < tableCount; i++ {
base := int64(i*2 + 1)
ranges = append(ranges, stream.Range{
StartKey: tablecodec.EncodeTablePrefix(base),
EndKey: tablecodec.EncodeTablePrefix(base + 1),
})
}
return ranges
}

func simpleTask(name string, tableCount int) stream.TaskInfo {
backend, _ := storage.ParseBackend("noop://", nil)
return stream.TaskInfo{
StreamBackupTaskInfo: backuppb.StreamBackupTaskInfo{
Storage: backend,
StartTs: 0,
EndTs: 1000,
Name: name,
TableFilter: []string{"*.*"},
},
Ranges: simpleRanges(tableCount),
Pausing: false,
}
}

func keyIs(t *testing.T, key, value []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 1)
require.Equal(t, key, r.KVs[0].Key)
require.Equal(t, value, r.KVs[0].Value)
}

func keyExists(t *testing.T, key []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 1)
}

func keyNotExists(t *testing.T, key []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 0)
}

func rangeMatches(t *testing.T, ranges stream.Ranges, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(ranges[0].StartKey, ranges[len(ranges)-1].EndKey, mvcc.RangeOptions{})
require.NoError(t, err)
if len(r.KVs) != len(ranges) {
t.Logf("len(ranges) not match len(response.KVs) [%d vs %d]", len(ranges), len(r.KVs))
t.Fail()
return
}
for i, rng := range ranges {
require.Equalf(t, r.KVs[i].Key, []byte(rng.StartKey), "the %dth of ranges not matched.(key)", i)
require.Equalf(t, r.KVs[i].Value, []byte(rng.EndKey), "the %dth of ranges not matched.(value)", i)
}
}

func rangeIsEmpty(t *testing.T, prefix []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(prefix, kv.PrefixNextKey(prefix), mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 0)
}

func TestAll(t *testing.T) {
etcd, cli := runEtcd(t)
defer etcd.Server.Stop()
metaCli := stream.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
}

func testBasic(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "two tables"
task := simpleTask(taskName, 2)
taskData, err := task.Marshal()
require.NoError(t, err)
require.NoError(t, metaCli.PutTask(ctx, task))
keyIs(t, []byte(stream.TaskOf(taskName)), taskData, etcd)
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
rangeMatches(t, []stream.Range{
{StartKey: []byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(1))), EndKey: tablecodec.EncodeTablePrefix(2)},
{StartKey: []byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(3))), EndKey: tablecodec.EncodeTablePrefix(4)},
}, etcd)

remoteTask, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, remoteTask.Pause(ctx))
keyExists(t, []byte(stream.Pause(taskName)), etcd)
require.NoError(t, metaCli.PauseTask(ctx, taskName))
keyExists(t, []byte(stream.Pause(taskName)), etcd)
paused, err := remoteTask.Paused(ctx)
require.NoError(t, err)
require.True(t, paused)
require.NoError(t, metaCli.ResumeTask(ctx, taskName))
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
require.NoError(t, metaCli.ResumeTask(ctx, taskName))
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
paused, err = remoteTask.Paused(ctx)
require.NoError(t, err)
require.False(t, paused)

require.NoError(t, metaCli.DeleteTask(ctx, taskName))
keyNotExists(t, []byte(stream.TaskOf(taskName)), etcd)
rangeIsEmpty(t, []byte(stream.RangesOf(taskName)), etcd)
}

func testForwardProgress(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "many-tables"
taskInfo := simpleTask(taskName, 65)
defer func() {
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
}()

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
task, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, task.Step(ctx, 1, 1, 42))
require.NoError(t, task.Step(ctx, 1, 2, 43))
require.NoError(t, task.Step(ctx, 2, 3, 41))
require.NoError(t, task.Step(ctx, 2, 5, 40))
rs, err := task.Ranges(ctx)
require.NoError(t, err)
require.Equal(t, simpleRanges(65), rs)
store1Checkpoint, err := task.MinNextBackupTS(ctx, 1)
require.NoError(t, err)
require.Equal(t, store1Checkpoint, uint64(42))
store2Checkpoint, err := task.MinNextBackupTS(ctx, 2)
require.NoError(t, err)
require.Equal(t, store2Checkpoint, uint64(40))
}
Loading