-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BR: add stream backup meta client #30088
Changes from 3 commits
69c8021
103c130
f473bbf
6f6842a
26056fd
9ddb4d1
f5b26c4
bea4263
60b1382
03871ee
6a46907
ecf6a74
fe2e9dc
1e4f772
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
package stream | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/binary" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/pingcap/errors" | ||
|
||
backuppb "github.com/pingcap/kvproto/pkg/brpb" | ||
berrors "github.com/pingcap/tidb/br/pkg/errors" | ||
"github.com/pingcap/tidb/br/pkg/redact" | ||
"go.etcd.io/etcd/clientv3" | ||
) | ||
|
||
// MetaDataClient is the client for operations over metadata. | ||
type MetaDataClient struct { | ||
*clientv3.Client | ||
} | ||
|
||
// PutTask put a task to the metadata storage. | ||
func (c *MetaDataClient) PutTask(ctx context.Context, task TaskInfo) error { | ||
data, err := task.StreamBackupTaskInfo.Marshal() | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to marshal task %s", task.Name) | ||
} | ||
|
||
ops := make([]clientv3.Op, 0, 2+len(task.Ranges)) | ||
ops = append(ops, clientv3.OpPut(TaskOf(task.GetName()), string(data))) | ||
for _, r := range task.Ranges { | ||
ops = append(ops, clientv3.OpPut(RangeKeyOf(task.Name, r[0]), string(r[1]))) | ||
} | ||
if task.Pausing { | ||
ops = append(ops, clientv3.OpPut(Pause(task.Name), "")) | ||
} | ||
|
||
txn := c.KV.Txn(ctx) | ||
_, err = txn.Then(ops...).Commit() | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to commit the change for task %s", task.Name) | ||
} | ||
return nil | ||
} | ||
|
||
// DeleteTask deletes a task, along with its metadata. | ||
func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error { | ||
_, err := c.KV.Txn(ctx). | ||
Then(clientv3.OpDelete(TaskOf(taskName)), | ||
clientv3.OpDelete(RangesOf(taskName), clientv3.WithPrefix()), | ||
clientv3.OpDelete(Pause(taskName))). | ||
Commit() | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to delete task itself %s", taskName) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *MetaDataClient) PauseTask(ctx context.Context, taskName string) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how we control TiKV pause observe task.🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to kenny, pausing / resuming is out of our current scope. 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to clarify, it is out of scope because we aren't sure if we want the non-destructive pause/resume or the destructive stop/start, so we delay the feature as it is not essential anyway. |
||
_, err := c.KV.Put(ctx, Pause(taskName), "") | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to pause task %s", taskName) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *MetaDataClient) ResumeTask(ctx context.Context, taskName string) error { | ||
_, err := c.KV.Delete(ctx, Pause(taskName)) | ||
if err != nil { | ||
return errors.Annotatef(err, "failed to resume task %s", taskName) | ||
} | ||
return nil | ||
} | ||
|
||
// GetTask get the basic task handle from the metadata storage. | ||
func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) { | ||
resp, err := c.Get(ctx, TaskOf(taskName)) | ||
if err != nil { | ||
return nil, errors.Annotatef(err, "failed to fetch task %s", taskName) | ||
} | ||
if len(resp.Kvs) == 0 { | ||
return nil, errors.Annotatef(berrors.ErrPiTRTaskNotFound, "no such task %s", taskName) | ||
} | ||
var taskInfo backuppb.StreamBackupTaskInfo | ||
err = proto.Unmarshal(resp.Kvs[0].Value, &taskInfo) | ||
if err != nil { | ||
return nil, errors.Annotatef(err, "invalid binary presentation of task info (name = %s)", taskName) | ||
} | ||
task := &Task{ | ||
cli: c, | ||
Info: taskInfo, | ||
} | ||
return task, nil | ||
} | ||
|
||
// Task presents a remote "task" object. | ||
// returned by a query of task. | ||
// Associated to the client created it, hence be able to fetch remote fields like `ranges`. | ||
type Task struct { | ||
cli *MetaDataClient | ||
Info backuppb.StreamBackupTaskInfo | ||
} | ||
|
||
// Pause is a shorthand for `metaCli.PauseTask`. | ||
func (t *Task) Pause(ctx context.Context) error { | ||
return t.cli.PauseTask(ctx, t.Info.Name) | ||
} | ||
|
||
// Resume is a shorthand for `metaCli.ResumeTask` | ||
func (t *Task) Resume(ctx context.Context) error { | ||
return t.cli.ResumeTask(ctx, t.Info.Name) | ||
} | ||
|
||
func (t *Task) Paused(ctx context.Context) (bool, error) { | ||
resp, err := t.cli.KV.Get(ctx, Pause(t.Info.Name), clientv3.WithCountOnly()) | ||
if err != nil { | ||
return false, errors.Annotatef(err, "failed to fetch the status of task %s", t.Info.Name) | ||
} | ||
return resp.Count > 0, nil | ||
} | ||
|
||
// Ranges tries to fetch the range from the metadata storage. | ||
func (t *Task) Ranges(ctx context.Context) (Ranges, error) { | ||
ranges := make(Ranges, 0, 64) | ||
kvs, err := ScanEtcdPrefix(t.cli.Client, RangesOf(t.Info.Name)).AllPages(ctx, 64) | ||
if err != nil { | ||
return nil, errors.Annotatef(err, "failed to fetch ranges of task %s", t.Info.Name) | ||
} | ||
commonPrefix := []byte(RangesOf(t.Info.Name)) | ||
for _, kv := range kvs { | ||
// Given we scan the key `RangesOf(t.Info.Name)` with `WithPrefix()`, | ||
// The prefix should always be RangesOf(t.Info.Name). | ||
// It would be safe to cut the prefix directly. (instead of use TrimPrefix) | ||
// But the rule not apply for the slash. Maybe scan the prefix RangesOf(t.Info.Name) + "/"? | ||
startKey := bytes.TrimPrefix(kv[0][len(commonPrefix):], []byte("/")) | ||
ranges = append(ranges, [...][]byte{startKey, kv[1]}) | ||
} | ||
return ranges, nil | ||
} | ||
|
||
// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store. | ||
// FIXME: this would probably exceed the gRPC max request size (1.5M), maybe we need page scanning, | ||
// but we cannot both do `WithPrefix` and `WithStart`. Maybe impl a `PrefixScanner` with `kv.NextPrefixKey()`? | ||
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) { | ||
min := uint64(0xffffffff) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe 0xFFFFFFFFFFFFFFFF ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just found |
||
scanner := ScanEtcdPrefix(t.cli.Client, CheckPointsOf(t.Info.Name, store)) | ||
kvs, err := scanner.AllPages(ctx, 1024) | ||
if err != nil { | ||
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name) | ||
} | ||
for _, kv := range kvs { | ||
if len(kv[1]) != 8 { | ||
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata, | ||
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)", | ||
store, | ||
len(kv[1]), | ||
redact.Key(kv[1])) | ||
} | ||
nextBackupTS := binary.BigEndian.Uint64(kv[1]) | ||
if nextBackupTS < min { | ||
min = nextBackupTS | ||
} | ||
} | ||
return min, nil | ||
} | ||
|
||
// Step forwards the progress (next_backup_ts) of some region. | ||
// The task should be done by TiKV. This function should only be used for test cases. | ||
func (t *Task) Step(ctx context.Context, store uint64, region uint64, ts uint64) error { | ||
_, err := t.cli.KV.Put(ctx, CheckpointOf(t.Info.Name, store, region), string(encodeUint64(ts))) | ||
if err != nil { | ||
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts) | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
// This package tests the login in MetaClient with a embed etcd. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
package stream_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"net/url" | ||
"testing" | ||
|
||
backuppb "github.com/pingcap/kvproto/pkg/brpb" | ||
"github.com/pingcap/log" | ||
"github.com/pingcap/tidb/br/pkg/logutil" | ||
"github.com/pingcap/tidb/br/pkg/storage" | ||
"github.com/pingcap/tidb/br/pkg/stream" | ||
"github.com/pingcap/tidb/tablecodec" | ||
"github.com/stretchr/testify/require" | ||
"github.com/tikv/client-go/v2/kv" | ||
"go.etcd.io/etcd/clientv3" | ||
"go.etcd.io/etcd/embed" | ||
"go.etcd.io/etcd/mvcc" | ||
) | ||
|
||
func getRandomLocalAddr() url.URL { | ||
listen, err := net.Listen("tcp", "127.0.0.1:") | ||
defer func() { | ||
if err := listen.Close(); err != nil { | ||
log.Panic("failed to release temporary port", logutil.ShortError(err)) | ||
} | ||
}() | ||
if err != nil { | ||
log.Panic("failed to listen random port", logutil.ShortError(err)) | ||
} | ||
u, err := url.Parse(fmt.Sprintf("http://%s", listen.Addr().String())) | ||
if err != nil { | ||
log.Panic("failed to parse url", logutil.ShortError(err)) | ||
} | ||
return *u | ||
} | ||
|
||
func runEtcd(t *testing.T) (*embed.Etcd, *clientv3.Client) { | ||
cfg := embed.NewConfig() | ||
cfg.Dir = t.TempDir() | ||
clientURL := getRandomLocalAddr() | ||
cfg.LCUrls = []url.URL{clientURL} | ||
cfg.LPUrls = []url.URL{getRandomLocalAddr()} | ||
cfg.LogLevel = "fatal" | ||
etcd, err := embed.StartEtcd(cfg) | ||
if err != nil { | ||
log.Panic("failed to start etcd server", logutil.ShortError(err)) | ||
} | ||
<-etcd.Server.ReadyNotify() | ||
cliCfg := clientv3.Config{ | ||
Endpoints: []string{clientURL.String()}, | ||
} | ||
cli, err := clientv3.New(cliCfg) | ||
if err != nil { | ||
log.Panic("failed to connect to etcd server", logutil.ShortError(err)) | ||
} | ||
return etcd, cli | ||
} | ||
|
||
func simpleRanges(tableCount int) stream.Ranges { | ||
ranges := stream.Ranges{} | ||
for i := 0; i < tableCount; i++ { | ||
base := int64(i*2 + 1) | ||
ranges = append(ranges, stream.Range{tablecodec.EncodeTablePrefix(base), tablecodec.EncodeTablePrefix(base + 1)}) | ||
} | ||
return ranges | ||
} | ||
|
||
func simpleTask(name string, tableCount int) stream.TaskInfo { | ||
backend, _ := storage.ParseBackend("noop://", nil) | ||
return stream.TaskInfo{ | ||
StreamBackupTaskInfo: backuppb.StreamBackupTaskInfo{ | ||
Storage: backend, | ||
StartTs: 0, | ||
EndTs: 1000, | ||
Name: name, | ||
TableFilter: []string{"*.*"}, | ||
}, | ||
Ranges: simpleRanges(tableCount), | ||
Pausing: false, | ||
} | ||
} | ||
|
||
func keyIs(t *testing.T, key, value []byte, etcd *embed.Etcd) { | ||
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{}) | ||
require.NoError(t, err) | ||
require.Len(t, r.KVs, 1) | ||
require.Equal(t, key, r.KVs[0].Key) | ||
require.Equal(t, value, r.KVs[0].Value) | ||
} | ||
|
||
func keyExists(t *testing.T, key []byte, etcd *embed.Etcd) { | ||
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{}) | ||
require.NoError(t, err) | ||
require.Len(t, r.KVs, 1) | ||
} | ||
|
||
func keyNotExists(t *testing.T, key []byte, etcd *embed.Etcd) { | ||
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{}) | ||
require.NoError(t, err) | ||
require.Len(t, r.KVs, 0) | ||
} | ||
|
||
func rangeMatches(t *testing.T, ranges stream.Ranges, etcd *embed.Etcd) { | ||
r, err := etcd.Server.KV().Range(ranges[0][0], ranges[len(ranges)-1][1], mvcc.RangeOptions{}) | ||
require.NoError(t, err) | ||
if len(r.KVs) != len(ranges) { | ||
t.Logf("len(ranges) not match len(response.KVs) [%d vs %d]", len(ranges), len(r.KVs)) | ||
t.Fail() | ||
return | ||
} | ||
for i, rng := range ranges { | ||
require.Equalf(t, r.KVs[i].Key, rng[0], "the %dth of ranges not matched.(key)", i) | ||
require.Equalf(t, r.KVs[i].Value, rng[1], "the %dth of ranges not matched.(value)", i) | ||
} | ||
} | ||
|
||
func rangeIsEmpty(t *testing.T, prefix []byte, etcd *embed.Etcd) { | ||
r, err := etcd.Server.KV().Range(prefix, kv.PrefixNextKey(prefix), mvcc.RangeOptions{}) | ||
require.NoError(t, err) | ||
require.Len(t, r.KVs, 0) | ||
} | ||
|
||
func TestAll(t *testing.T) { | ||
etcd, cli := runEtcd(t) | ||
defer etcd.Server.Stop() | ||
metaCli := stream.MetaDataClient{Client: cli} | ||
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) }) | ||
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) }) | ||
} | ||
|
||
func testBasic(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) { | ||
ctx := context.Background() | ||
taskName := "two tables" | ||
task := simpleTask(taskName, 2) | ||
taskData, err := task.Marshal() | ||
require.NoError(t, err) | ||
require.NoError(t, metaCli.PutTask(ctx, task)) | ||
keyIs(t, []byte(stream.TaskOf(taskName)), taskData, etcd) | ||
keyNotExists(t, []byte(stream.Pause(taskName)), etcd) | ||
rangeMatches(t, [][2][]byte{ | ||
{[]byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(1))), tablecodec.EncodeTablePrefix(2)}, | ||
{[]byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(3))), tablecodec.EncodeTablePrefix(4)}, | ||
}, etcd) | ||
|
||
remoteTask, err := metaCli.GetTask(ctx, taskName) | ||
require.NoError(t, err) | ||
require.NoError(t, remoteTask.Pause(ctx)) | ||
keyExists(t, []byte(stream.Pause(taskName)), etcd) | ||
require.NoError(t, metaCli.PauseTask(ctx, taskName)) | ||
keyExists(t, []byte(stream.Pause(taskName)), etcd) | ||
paused, err := remoteTask.Paused(ctx) | ||
require.NoError(t, err) | ||
require.True(t, paused) | ||
require.NoError(t, metaCli.ResumeTask(ctx, taskName)) | ||
keyNotExists(t, []byte(stream.Pause(taskName)), etcd) | ||
require.NoError(t, metaCli.ResumeTask(ctx, taskName)) | ||
keyNotExists(t, []byte(stream.Pause(taskName)), etcd) | ||
paused, err = remoteTask.Paused(ctx) | ||
require.NoError(t, err) | ||
require.False(t, paused) | ||
|
||
require.NoError(t, metaCli.DeleteTask(ctx, taskName)) | ||
keyNotExists(t, []byte(stream.TaskOf(taskName)), etcd) | ||
rangeIsEmpty(t, []byte(stream.RangesOf(taskName)), etcd) | ||
} | ||
|
||
func testForwardProgress(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) { | ||
ctx := context.Background() | ||
taskName := "many-tables" | ||
taskInfo := simpleTask(taskName, 65) | ||
defer func() { | ||
require.NoError(t, metaCli.DeleteTask(ctx, taskName)) | ||
}() | ||
|
||
require.NoError(t, metaCli.PutTask(ctx, taskInfo)) | ||
task, err := metaCli.GetTask(ctx, taskName) | ||
require.NoError(t, err) | ||
require.NoError(t, task.Step(ctx, 1, 1, 42)) | ||
require.NoError(t, task.Step(ctx, 1, 2, 43)) | ||
require.NoError(t, task.Step(ctx, 2, 3, 41)) | ||
require.NoError(t, task.Step(ctx, 2, 5, 40)) | ||
rs, err := task.Ranges(ctx) | ||
require.NoError(t, err) | ||
require.Equal(t, simpleRanges(65), rs) | ||
store1Checkpoint, err := task.MinNextBackupTS(ctx, 1) | ||
require.NoError(t, err) | ||
require.Equal(t, store1Checkpoint, uint64(42)) | ||
store2Checkpoint, err := task.MinNextBackupTS(ctx, 2) | ||
require.NoError(t, err) | ||
require.Equal(t, store2Checkpoint, uint64(40)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the license