Skip to content

Commit

Permalink
Merge branch 'master' into fk-on-update
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Nov 1, 2022
2 parents 9cd2b5d + 08aa32e commit 9289c43
Show file tree
Hide file tree
Showing 61 changed files with 10,333 additions and 9,940 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3429,8 +3429,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=",
version = "v2.0.1-0.20221017092635-91be9c6ce6c0",
sum = "h1:s8eJEGI4p/fxFwMBkoJ+4FAEQNQhHR47TZmVW+EEtOE=",
version = "v2.0.1-0.20221026083454-6c9c7c7c5815",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"db.go",
"import.go",
"import_retry.go",
"log_client.go",
"merge.go",
"pipeline_items.go",
"range.go",
Expand Down Expand Up @@ -41,6 +42,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//config",
"//ddl",
"//ddl/util",
Expand Down Expand Up @@ -131,6 +133,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/iter",
"//infoschema",
"//kv",
"//meta/autoid",
Expand Down
195 changes: 25 additions & 170 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

var id uint64

type metaMaker = func(files ...*backuppb.DataFileInfo) *backuppb.Metadata

func wm(start, end, minBegin uint64) *backuppb.DataFileInfo {
i := wr(start, end, minBegin)
i.IsMeta = true
Expand Down Expand Up @@ -155,7 +157,7 @@ func (b *mockMetaBuilder) b(useV2 bool) (*storage.LocalStorage, string) {
return s, path
}

func TestReadMetaBetweenTS(t *testing.T) {
func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
Expand Down Expand Up @@ -251,103 +253,12 @@ func TestReadMetaBetweenTS(t *testing.T) {
}
}

func TestReadMetaBetweenTSV2(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
startTS uint64
endTS uint64
expectedShiftTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
startTS: 4,
endTS: 5,
expectedShiftTS: 3,
expected: []int{0, 1},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
startTS: 50,
endTS: 99,
expectedShiftTS: 1,
expected: []int{0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 98,
expected: []int{1, 0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5)),
m2(wr(101, 200, 101), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 101,
expected: []int{1},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()
init := LogFileManagerInit{
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
req.NoError(err)
metas, err := cli.readStreamMeta(ctx)
req.NoError(err)
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadMetaBetweenTS(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m2) })
}

func TestReadFromMetadata(t *testing.T) {
func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
Expand Down Expand Up @@ -413,70 +324,9 @@ func TestReadFromMetadata(t *testing.T) {
}
}

func TestReadFromMetadataV2(t *testing.T) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
untilTS: 10,
expected: []int{0, 1, 2},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
untilTS: 99,
expected: []int{0},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.LoadUntil(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
for _, m := range meta.metadata {
metas = append(metas, m)
}
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadFromMetadata(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadFromMetadataWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadFromMetadataWithVersion(t, m2) })
}

func dataFileInfoMatches(t *testing.T, listA []*backuppb.DataFileInfo, listB ...*backuppb.DataFileInfo) {
Expand Down Expand Up @@ -528,7 +378,7 @@ func formatL(l []*backuppb.DataFileInfo) string {
return "[" + strings.Join(r.Item, ", ") + "]"
}

func TestFileManager(t *testing.T) {
func testFileManagerWithMeta(t *testing.T, m metaMaker) {
type Case struct {
Metadata []*backuppb.Metadata
StartTS int
Expand All @@ -544,9 +394,9 @@ func TestFileManager(t *testing.T) {
cases := []Case{
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 2,
RestoreTS: 60,
Expand All @@ -556,9 +406,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 5,
RestoreTS: 80,
Expand All @@ -570,9 +420,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 6,
RestoreTS: 80,
Expand Down Expand Up @@ -629,3 +479,8 @@ func TestFileManager(t *testing.T) {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { run(t, c) })
}
}

func TestFileManger(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testFileManagerWithMeta(t, m) })
t.Run("MetaV2", func(t *testing.T) { testFileManagerWithMeta(t, m2) })
}
2 changes: 2 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ go_library(
"@com_github_aws_aws_sdk_go//service/s3/s3manager",
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_golang_snappy//:snappy",
"@com_github_google_uuid//:uuid",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/storage/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (w *withCompression) ReadFile(ctx context.Context, name string) ([]byte, er
}

type compressReader struct {
io.ReadCloser
io.Reader
io.Closer
}

// nolint:interfacer
Expand All @@ -94,14 +95,20 @@ func newInterceptReader(fileReader ExternalFileReader, compressType CompressType
return nil, errors.Trace(err)
}
return &compressReader{
ReadCloser: r,
Reader: r,
Closer: fileReader,
}, nil
}

func (*compressReader) Seek(_ int64, _ int) (int64, error) {
return int64(0), errors.Annotatef(berrors.ErrStorageInvalidConfig, "compressReader doesn't support Seek now")
}

func (c *compressReader) Close() error {
err := c.Closer.Close()
return err
}

type flushStorageWriter struct {
writer io.Writer
flusher flusher
Expand Down
Loading

0 comments on commit 9289c43

Please sign in to comment.