diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 58fe2fa150199..a8d6e6f251d01 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -33,9 +33,11 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -777,6 +779,7 @@ var ( concurrency = flag.Int("concurrency", 100, "concurrency") memoryLimit = flag.Int("memory-limit", 64*units.MiB, "memory limit") skipCreate = flag.Bool("skip-create", false, "skip create files") + fileName = flag.String("file-name", "test", "file name for tests") ) func TestReadFileConcurrently(t *testing.T) { @@ -953,6 +956,7 @@ func mergeStep(t *testing.T, s *mergeTestSuite) { 64*1024, mergeOutput, DefaultBlockSize, + DefaultMemSizeLimit, 8*1024, 1*size.MB, 8*1024, @@ -1078,3 +1082,21 @@ func TestMergeBench(t *testing.T) { testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep) testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep) } + +func TestReadStatFile(t *testing.T) { + ctx := context.Background() + store := openTestingStorage(t) + rd, _ := newStatsReader(ctx, store, *fileName, 4096) + for { + + prop, err := rd.nextProp() + if err == io.EOF { + break + } + logutil.BgLogger().Info("read one prop", + zap.Int("prop len", prop.len()), + zap.Int("prop offset", int(prop.offset)), + zap.Int("prop size", int(prop.size)), + zap.Int("prop keys", int(prop.keys))) + } +} diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 8ff4c5236bd6b..f51289b96b405 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -193,7 +193,7 @@ func getFilesReadConcurrency( for i := range statsFiles { result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc) result[i] = max(result[i], 1) - logutil.Logger(ctx).Debug("found hotspot file in getFilesReadConcurrency", + logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency", zap.String("filename", statsFiles[i]), zap.Uint64("startOffset", startOffs[i]), zap.Uint64("endOffset", endOffs[i]), diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go index a625248e61f31..07428021576ea 100644 --- a/br/pkg/lightning/backend/external/file.go +++ b/br/pkg/lightning/backend/external/file.go @@ -76,7 +76,7 @@ func (s *KeyValueStore) addEncodedData(data []byte) error { s.rc.currProp.keys >= s.rc.propKeysDist { newProp := *s.rc.currProp s.rc.props = append(s.rc.props, &newProp) - + // reset currProp, and start to update this prop. s.rc.currProp.firstKey = nil s.rc.currProp.offset = s.offset s.rc.currProp.keys = 0 diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index 03c1a57ea0cad..71b30a47959bf 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -23,6 +23,7 @@ func MergeOverlappingFiles( readBufferSize int, newFilePrefix string, blockSize int, + memSizeLimit uint64, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64, @@ -61,7 +62,7 @@ func MergeOverlappingFiles( readBufferSize, newFilePrefix, uuid.New().String(), - DefaultMemSizeLimit, + memSizeLimit, blockSize, writeBatchCount, propSizeDist, @@ -157,9 +158,5 @@ func mergeOverlappingFilesInternal( }) } - err = writer.Close(ctx) - if err != nil { - return err - } - return nil + return writer.Close(ctx) } diff --git a/br/pkg/lightning/backend/external/onefile_writer.go b/br/pkg/lightning/backend/external/onefile_writer.go index 06d2f5f86df95..d64cc2bbeef4d 100644 --- a/br/pkg/lightning/backend/external/onefile_writer.go +++ b/br/pkg/lightning/backend/external/onefile_writer.go @@ -64,7 +64,9 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) ( w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file") w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)}) if err != nil { - _ = w.dataWriter.Close(ctx) + w.logger.Info("create stat writer failed", + zap.Error(err)) + err = w.dataWriter.Close(ctx) return err } w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile)) @@ -103,6 +105,8 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err return err } w.rc.reset() + // the new prop should have the same offset with kvStore. + w.rc.currProp.offset = w.kvStore.offset } binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen)) binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal))) @@ -149,7 +153,7 @@ func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) { err = err1 return } - // 4. close stat writer. + // 3. close stat writer. err2 := w.statWriter.Close(ctx) if err2 != nil { w.logger.Error("Close stat writer failed", zap.Error(err)) diff --git a/br/pkg/lightning/backend/external/onefile_writer_test.go b/br/pkg/lightning/backend/external/onefile_writer_test.go index c347dd1b3395d..672da6ccb26e5 100644 --- a/br/pkg/lightning/backend/external/onefile_writer_test.go +++ b/br/pkg/lightning/backend/external/onefile_writer_test.go @@ -344,3 +344,54 @@ func TestOnefileWriterManyRows(t *testing.T) { require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum) require.EqualValues(t, expectedTotalSize, resSummary.TotalSize) } + +func TestOnefilePropOffset(t *testing.T) { + seed := time.Now().Unix() + rand.Seed(uint64(seed)) + t.Logf("seed: %d", seed) + ctx := context.Background() + memStore := storage.NewMemStorage() + memSizeLimit := (rand.Intn(10) + 1) * 200 + + // 1. write into one file. + // 2. read stat file and check offset ascending. + writer := NewWriterBuilder(). + SetPropSizeDistance(100). + SetPropKeysDistance(2). + SetBlockSize(memSizeLimit). + SetMemorySizeLimit(uint64(memSizeLimit)). + BuildOneFile(memStore, "/test", "0") + + require.NoError(t, writer.Init(ctx, 5*1024*1024)) + + kvCnt := 10000 + kvs := make([]common.KvPair, kvCnt) + for i := 0; i < kvCnt; i++ { + randLen := rand.Intn(10) + 1 + kvs[i].Key = make([]byte, randLen) + _, err := rand.Read(kvs[i].Key) + require.NoError(t, err) + randLen = rand.Intn(10) + 1 + kvs[i].Val = make([]byte, randLen) + _, err = rand.Read(kvs[i].Val) + require.NoError(t, err) + } + + for _, item := range kvs { + require.NoError(t, writer.WriteRow(ctx, item.Key, item.Val)) + } + + require.NoError(t, writer.Close(ctx)) + + rd, err := newStatsReader(ctx, memStore, "/test/0_stat/one-file", 4096) + require.NoError(t, err) + lastOffset := uint64(0) + for { + prop, err := rd.nextProp() + if err == io.EOF { + break + } + require.GreaterOrEqual(t, prop.offset, lastOffset) + lastOffset = prop.offset + } +} diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index dce2d75ef03d7..01344fb2a66cc 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -148,7 +148,9 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { startKey = BytesMin(startKey, s.Min.Clone()) endKey = BytesMax(endKey, s.Max.Clone().Next()) } - + mergeMemSize := (rand.Intn(10) + 1) * 100 + // use random mergeMemSize to test different memLimit of writer. + // reproduce one bug, see https://github.com/pingcap/tidb/issues/49590 for _, group := range dataGroup { require.NoError(t, MergeOverlappingFiles( ctx, @@ -157,7 +159,8 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { int64(5*size.MB), 100, "/test2", - 100, + mergeMemSize, + uint64(mergeMemSize), 8*1024, 100, 2, diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index c0fdbaffd67eb..da8954d966060 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -107,6 +107,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta 64*1024, prefix, external.DefaultBlockSize, + external.DefaultMemSizeLimit, 8*1024, 1*size.MB, 8*1024, diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 449027bd854fd..c709d86b5c1c6 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -309,9 +309,21 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize)))) - return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, m.partSize, 64*1024, - prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024, - onClose, int(m.taskMeta.Plan.ThreadCnt), false) + return external.MergeOverlappingFiles( + ctx, + sm.DataFiles, + m.controller.GlobalSortStore, + m.partSize, + 64*1024, + prefix, + getKVGroupBlockSize(sm.KVGroup), + external.DefaultMemSizeLimit, + 8*1024, + 1*size.MB, + 8*1024, + onClose, + int(m.taskMeta.Plan.ThreadCnt), + false) } func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {