diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 422c4cce3f63e..7e4953bad8306 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2323,6 +2323,11 @@ func (rc *Client) RestoreMetaKVFiles( } } + log.Info("start to restore meta files", + zap.Int("total files", len(files)), + zap.Int("default files", len(filesInDefaultCF)), + zap.Int("write files", len(filesInWriteCF))) + if err := rc.RestoreMetaKVFilesWithBatchMethod( ctx, SortMetaKVFiles(filesInDefaultCF), @@ -2380,6 +2385,7 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( if i == 0 { rangeMax = f.MaxTs rangeMin = f.MinTs + batchSize = f.Length } else { if f.MinTs <= rangeMax && batchSize+f.Length <= MetaKVBatchSize { rangeMin = mathutil.Min(rangeMin, f.MinTs) @@ -2412,16 +2418,18 @@ func (rc *Client) RestoreMetaKVFilesWithBatchMethod( writeIdx = toWriteIdx } } - if i == len(defaultFiles)-1 { - _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) - if err != nil { - return errors.Trace(err) - } - _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) - if err != nil { - return errors.Trace(err) - } - } + } + + // restore the left meta kv files and entries + // Notice: restoreBatch needs to realize the parameter `files` and `kvEntries` might be empty + // Assert: defaultIdx <= len(defaultFiles) && writeIdx <= len(writeFiles) + _, err = restoreBatch(ctx, defaultFiles[defaultIdx:], schemasReplace, defaultKvEntries, math.MaxUint64, updateStats, progressInc, stream.DefaultCF) + if err != nil { + return errors.Trace(err) + } + _, err = restoreBatch(ctx, writeFiles[writeIdx:], schemasReplace, writeKvEntries, math.MaxUint64, updateStats, progressInc, stream.WriteCF) + if err != nil { + return errors.Trace(err) } return nil diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index f86abe231b13a..d917993391e46 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -632,21 +632,31 @@ func TestDeleteRangeQuery(t *testing.T) { require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") } -func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { +func TestRestoreBatchMetaKVFiles(t *testing.T) { + client := restore.MockClient(nil) files := []*backuppb.DataFileInfo{} + // test empty files and entries + next, err := client.RestoreBatchMetaKVFiles(context.Background(), files[0:], nil, make([]*restore.KvEntryWithTS, 0), math.MaxUint64, nil, nil, "") + require.NoError(t, err) + require.Equal(t, 0, len(next)) +} + +func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{} batchCount := 0 client := restore.MockClient(nil) err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), - files, - files, + files_default, + files_write, nil, nil, nil, func( ctx context.Context, - defaultFiles []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -654,16 +664,19 @@ func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { + require.Equal(t, 0, len(entries)) + require.Equal(t, 0, len(files)) batchCount++ return nil, nil }, ) require.Nil(t, err) - require.Equal(t, batchCount, 0) + require.Equal(t, batchCount, 2) } -func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { - files := []*backuppb.DataFileInfo{ +func TestRestoreMetaKVFilesWithBatchMethod2_default_empty(t *testing.T) { + files_default := []*backuppb.DataFileInfo{} + files_write := []*backuppb.DataFileInfo{ { Path: "f1", MinTs: 100, @@ -671,19 +684,64 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { }, } batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) client := restore.MockClient(nil) err := client.RestoreMetaKVFilesWithBatchMethod( context.Background(), - files, + files_default, + files_write, nil, nil, nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.DefaultCF, cf) + batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.WriteCF, cf) + } + require.Equal(t, uint64(math.MaxUint64), filterTS) + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 1) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_1(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + } + files_write := []*backuppb.DataFileInfo{} + batchCount := 0 + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + nil, + nil, nil, func( ctx context.Context, - fs []*backuppb.DataFileInfo, + files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, entries []*restore.KvEntryWithTS, filterTS uint64, @@ -691,17 +749,153 @@ func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { progressInc func(), cf string, ) ([]*restore.KvEntryWithTS, error) { - if len(fs) > 0 { - result[batchCount] = fs + if len(entries) == 0 && len(files) == 0 { + require.Equal(t, stream.WriteCF, cf) batchCount++ + } else { + require.Equal(t, 0, len(entries)) + require.Equal(t, 1, len(files)) + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, stream.DefaultCF, cf) } + require.Equal(t, uint64(math.MaxUint64), filterTS) return nil, nil }, ) require.Nil(t, err) require.Equal(t, batchCount, 1) - require.Equal(t, len(result), 1) - require.Equal(t, result[0], files) +} + +func TestRestoreMetaKVFilesWithBatchMethod2_write_empty_2(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + nil, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } + require.Equal(t, 0, len(entries)) + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) +} + +func TestRestoreMetaKVFilesWithBatchMethod_with_entries(t *testing.T) { + files_default := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize - 1000, + }, + { + Path: "f2", + MinTs: 110, + MaxTs: 1100, + Length: restore.MetaKVBatchSize, + }, + } + files_write := []*backuppb.DataFileInfo{} + emptyCount := 0 + batchCount := 0 + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files_default, + files_write, + nil, + nil, + nil, + func( + ctx context.Context, + files []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(entries) == 0 && len(files) == 0 { + // write - write + require.Equal(t, stream.WriteCF, cf) + emptyCount++ + if emptyCount == 1 { + require.Equal(t, uint64(110), filterTS) + } else { + require.Equal(t, uint64(math.MaxUint64), filterTS) + } + } else { + // default - default + batchCount++ + require.Equal(t, 1, len(files)) + require.Equal(t, stream.DefaultCF, cf) + if batchCount == 1 { + require.Equal(t, uint64(100), files[0].MinTs) + require.Equal(t, uint64(110), filterTS) + return nil, nil + } + require.Equal(t, 0, len(entries)) + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 2) + require.Equal(t, emptyCount, 2) } func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { @@ -967,13 +1161,13 @@ func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { Path: "f1", MinTs: 100, MaxTs: 120, - Length: 1, + Length: 100, }, { Path: "f2", MinTs: 100, MaxTs: 120, - Length: restore.MetaKVBatchSize, + Length: restore.MetaKVBatchSize - 100, }, { Path: "f3",