Skip to content

Commit

Permalink
global sort: fix broken prop with onefile writer (#49589)
Browse files Browse the repository at this point in the history
close #49590
  • Loading branch information
ywqzzy authored Dec 20, 2023
1 parent c06f54c commit da460f1
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 15 deletions.
22 changes: 22 additions & 0 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -777,6 +779,7 @@ var (
concurrency = flag.Int("concurrency", 100, "concurrency")
memoryLimit = flag.Int("memory-limit", 64*units.MiB, "memory limit")
skipCreate = flag.Bool("skip-create", false, "skip create files")
fileName = flag.String("file-name", "test", "file name for tests")
)

func TestReadFileConcurrently(t *testing.T) {
Expand Down Expand Up @@ -953,6 +956,7 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
64*1024,
mergeOutput,
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
Expand Down Expand Up @@ -1078,3 +1082,21 @@ func TestMergeBench(t *testing.T) {
testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep)
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep)
}

func TestReadStatFile(t *testing.T) {
ctx := context.Background()
store := openTestingStorage(t)
rd, _ := newStatsReader(ctx, store, *fileName, 4096)
for {

prop, err := rd.nextProp()
if err == io.EOF {
break
}
logutil.BgLogger().Info("read one prop",
zap.Int("prop len", prop.len()),
zap.Int("prop offset", int(prop.offset)),
zap.Int("prop size", int(prop.size)),
zap.Int("prop keys", int(prop.keys)))
}
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func getFilesReadConcurrency(
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
result[i] = max(result[i], 1)
logutil.Logger(ctx).Debug("found hotspot file in getFilesReadConcurrency",
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *KeyValueStore) addEncodedData(data []byte) error {
s.rc.currProp.keys >= s.rc.propKeysDist {
newProp := *s.rc.currProp
s.rc.props = append(s.rc.props, &newProp)

// reset currProp, and start to update this prop.
s.rc.currProp.firstKey = nil
s.rc.currProp.offset = s.offset
s.rc.currProp.keys = 0
Expand Down
9 changes: 3 additions & 6 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func MergeOverlappingFiles(
readBufferSize int,
newFilePrefix string,
blockSize int,
memSizeLimit uint64,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
Expand Down Expand Up @@ -61,7 +62,7 @@ func MergeOverlappingFiles(
readBufferSize,
newFilePrefix,
uuid.New().String(),
DefaultMemSizeLimit,
memSizeLimit,
blockSize,
writeBatchCount,
propSizeDist,
Expand Down Expand Up @@ -157,9 +158,5 @@ func mergeOverlappingFilesInternal(
})
}

err = writer.Close(ctx)
if err != nil {
return err
}
return nil
return writer.Close(ctx)
}
8 changes: 6 additions & 2 deletions br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) (
w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file")
w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)})
if err != nil {
_ = w.dataWriter.Close(ctx)
w.logger.Info("create stat writer failed",
zap.Error(err))
err = w.dataWriter.Close(ctx)
return err
}
w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile))
Expand Down Expand Up @@ -103,6 +105,8 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
return err
}
w.rc.reset()
// the new prop should have the same offset with kvStore.
w.rc.currProp.offset = w.kvStore.offset
}
binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen))
binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal)))
Expand Down Expand Up @@ -149,7 +153,7 @@ func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) {
err = err1
return
}
// 4. close stat writer.
// 3. close stat writer.
err2 := w.statWriter.Close(ctx)
if err2 != nil {
w.logger.Error("Close stat writer failed", zap.Error(err))
Expand Down
51 changes: 51 additions & 0 deletions br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,54 @@ func TestOnefileWriterManyRows(t *testing.T) {
require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum)
require.EqualValues(t, expectedTotalSize, resSummary.TotalSize)
}

func TestOnefilePropOffset(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
ctx := context.Background()
memStore := storage.NewMemStorage()
memSizeLimit := (rand.Intn(10) + 1) * 200

// 1. write into one file.
// 2. read stat file and check offset ascending.
writer := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
SetBlockSize(memSizeLimit).
SetMemorySizeLimit(uint64(memSizeLimit)).
BuildOneFile(memStore, "/test", "0")

require.NoError(t, writer.Init(ctx, 5*1024*1024))

kvCnt := 10000
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
randLen := rand.Intn(10) + 1
kvs[i].Key = make([]byte, randLen)
_, err := rand.Read(kvs[i].Key)
require.NoError(t, err)
randLen = rand.Intn(10) + 1
kvs[i].Val = make([]byte, randLen)
_, err = rand.Read(kvs[i].Val)
require.NoError(t, err)
}

for _, item := range kvs {
require.NoError(t, writer.WriteRow(ctx, item.Key, item.Val))
}

require.NoError(t, writer.Close(ctx))

rd, err := newStatsReader(ctx, memStore, "/test/0_stat/one-file", 4096)
require.NoError(t, err)
lastOffset := uint64(0)
for {
prop, err := rd.nextProp()
if err == io.EOF {
break
}
require.GreaterOrEqual(t, prop.offset, lastOffset)
lastOffset = prop.offset
}
}
7 changes: 5 additions & 2 deletions br/pkg/lightning/backend/external/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
startKey = BytesMin(startKey, s.Min.Clone())
endKey = BytesMax(endKey, s.Max.Clone().Next())
}

mergeMemSize := (rand.Intn(10) + 1) * 100
// use random mergeMemSize to test different memLimit of writer.
// reproduce one bug, see https://github.com/pingcap/tidb/issues/49590
for _, group := range dataGroup {
require.NoError(t, MergeOverlappingFiles(
ctx,
Expand All @@ -157,7 +159,8 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
int64(5*size.MB),
100,
"/test2",
100,
mergeMemSize,
uint64(mergeMemSize),
8*1024,
100,
2,
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
64*1024,
prefix,
external.DefaultBlockSize,
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
Expand Down
18 changes: 15 additions & 3 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,21 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S

logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize))))

return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, m.partSize, 64*1024,
prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024,
onClose, int(m.taskMeta.Plan.ThreadCnt), false)
return external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
m.controller.GlobalSortStore,
m.partSize,
64*1024,
prefix,
getKVGroupBlockSize(sm.KVGroup),
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
onClose,
int(m.taskMeta.Plan.ThreadCnt),
false)
}

func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {
Expand Down

0 comments on commit da460f1

Please sign in to comment.