Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global sort: fix broken prop with onefile writer #49589

Merged
merged 11 commits into from
Dec 20, 2023
22 changes: 22 additions & 0 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -761,6 +763,7 @@ var (
concurrency = flag.Int("concurrency", 100, "concurrency")
memoryLimit = flag.Int("memory-limit", 64*units.MiB, "memory limit")
skipCreate = flag.Bool("skip-create", false, "skip create files")
fileName = flag.String("file-name", "test", "test")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I will rename it

)

func TestReadFileConcurrently(t *testing.T) {
Expand Down Expand Up @@ -937,6 +940,7 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
64*1024,
mergeOutput,
DefaultBlockSize,
DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
Expand Down Expand Up @@ -1005,3 +1009,21 @@ func TestMergeBench(t *testing.T) {
testCompareMergeWithContent(t, createAscendingFiles, mergeStep)
testCompareMergeWithContent(t, createEvenlyDistributedFiles, mergeStep)
}

func TestReadStatFile(t *testing.T) {
ctx := context.Background()
store := openTestingStorage(t)
rd, _ := newStatsReader(ctx, store, *fileName, 4096)
for {

prop, err := rd.nextProp()
if err == io.EOF {
break
}
logutil.BgLogger().Info("read one prop",
zap.Int("prop len", prop.len()),
zap.Int("prop offset", int(prop.offset)),
zap.Int("prop size", int(prop.size)),
zap.Int("prop keys", int(prop.keys)))
}
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func getFilesReadConcurrency(
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
result[i] = max(result[i], 1)
logutil.Logger(ctx).Debug("found hotspot file in getFilesReadConcurrency",
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *KeyValueStore) addEncodedData(data []byte) error {
s.rc.currProp.keys >= s.rc.propKeysDist {
newProp := *s.rc.currProp
s.rc.props = append(s.rc.props, &newProp)

// reset currProp, and start to update this prop.
s.rc.currProp.firstKey = nil
s.rc.currProp.offset = s.offset
s.rc.currProp.keys = 0
Expand Down
9 changes: 3 additions & 6 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func MergeOverlappingFiles(
readBufferSize int,
newFilePrefix string,
blockSize int,
memSizeLimit uint64,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
Expand Down Expand Up @@ -61,7 +62,7 @@ func MergeOverlappingFiles(
readBufferSize,
newFilePrefix,
uuid.New().String(),
DefaultMemSizeLimit,
memSizeLimit,
blockSize,
writeBatchCount,
propSizeDist,
Expand Down Expand Up @@ -157,9 +158,5 @@ func mergeOverlappingFilesInternal(
})
}

err = writer.Close(ctx)
if err != nil {
return err
}
return nil
return writer.Close(ctx)
}
8 changes: 6 additions & 2 deletions br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (w *OneFileWriter) initWriter(ctx context.Context, partSize int64) (
w.statFile = filepath.Join(w.filenamePrefix+statSuffix, "one-file")
w.statWriter, err = w.store.Create(ctx, w.statFile, &storage.WriterOption{Concurrency: 20, PartSize: int64(5 * size.MB)})
if err != nil {
_ = w.dataWriter.Close(ctx)
w.logger.Info("create stat writer failed",
zap.Error(err))
err = w.dataWriter.Close(ctx)
return err
}
w.logger.Info("one file writer", zap.String("data-file", w.dataFile), zap.String("stat-file", w.statFile))
Expand Down Expand Up @@ -103,6 +105,8 @@ func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) err
return err
}
w.rc.reset()
// the new prop should have the seem offset with kvStore.
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
w.rc.currProp.offset = w.kvStore.offset
}
binary.BigEndian.AppendUint64(buf[:0], uint64(keyLen))
binary.BigEndian.AppendUint64(buf[lengthBytes:lengthBytes], uint64(len(idxVal)))
Expand Down Expand Up @@ -149,7 +153,7 @@ func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) {
err = err1
return
}
// 4. close stat writer.
// 3. close stat writer.
err2 := w.statWriter.Close(ctx)
if err2 != nil {
w.logger.Error("Close stat writer failed", zap.Error(err))
Expand Down
51 changes: 51 additions & 0 deletions br/pkg/lightning/backend/external/onefile_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,54 @@ func TestOnefileWriterManyRows(t *testing.T) {
require.Equal(t, expected.MaxOverlappingNum, resSummary.MultipleFilesStats[0].MaxOverlappingNum)
require.EqualValues(t, expectedTotalSize, resSummary.TotalSize)
}

func TestOnefilePropOffset(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)
ctx := context.Background()
memStore := storage.NewMemStorage()
memSizeLimit := (rand.Intn(10) + 1) * 200

// 1. write into one file.
// 2. read stat file and check offset ascending.
writer := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
SetBlockSize(memSizeLimit).
SetMemorySizeLimit(uint64(memSizeLimit)).
BuildOneFile(memStore, "/test", "0")

require.NoError(t, writer.Init(ctx, 5*1024*1024))

kvCnt := 10000
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
randLen := rand.Intn(10) + 1
kvs[i].Key = make([]byte, randLen)
_, err := rand.Read(kvs[i].Key)
require.NoError(t, err)
randLen = rand.Intn(10) + 1
kvs[i].Val = make([]byte, randLen)
_, err = rand.Read(kvs[i].Val)
require.NoError(t, err)
}

for _, item := range kvs {
require.NoError(t, writer.WriteRow(ctx, item.Key, item.Val))
}

require.NoError(t, writer.Close(ctx))

rd, err := newStatsReader(ctx, memStore, "/test/0_stat/one-file", 4096)
require.NoError(t, err)
lastOffset := uint64(0)
for {
prop, err := rd.nextProp()
if err == io.EOF {
break
}
require.GreaterOrEqual(t, prop.offset, lastOffset)
lastOffset = prop.offset
}
}
5 changes: 3 additions & 2 deletions br/pkg/lightning/backend/external/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
startKey = BytesMin(startKey, s.Min.Clone())
endKey = BytesMax(endKey, s.Max.Clone().Next())
}

mergeMemSize := (rand.Intn(10) + 1) * 100
Copy link
Collaborator

@Benjamin2037 Benjamin2037 Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this “mergeMemSize := (rand.Intn(10) + 1) * 100” mean, why here use a random func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the test run more cases to check different memSize/blockSize of writer can be used for writer.
When memSize is small enough, we will reach line 109 in onefile writer, hopefully the bug will reproduce.

for _, group := range dataGroup {
require.NoError(t, MergeOverlappingFiles(
ctx,
Expand All @@ -157,7 +157,8 @@ func TestGlobalSortLocalWithMerge(t *testing.T) {
int64(5*size.MB),
100,
"/test2",
100,
mergeMemSize,
uint64(mergeMemSize),
8*1024,
100,
2,
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
64*1024,
prefix,
external.DefaultBlockSize,
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
Expand Down
18 changes: 15 additions & 3 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,21 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S

logger.Info("merge sort partSize", zap.String("size", units.BytesSize(float64(m.partSize))))

return external.MergeOverlappingFiles(ctx, sm.DataFiles, m.controller.GlobalSortStore, m.partSize, 64*1024,
prefix, getKVGroupBlockSize(sm.KVGroup), 8*1024, 1*size.MB, 8*1024,
onClose, int(m.taskMeta.Plan.ThreadCnt), false)
return external.MergeOverlappingFiles(
ctx,
sm.DataFiles,
m.controller.GlobalSortStore,
m.partSize,
64*1024,
prefix,
getKVGroupBlockSize(sm.KVGroup),
external.DefaultMemSizeLimit,
8*1024,
1*size.MB,
8*1024,
onClose,
int(m.taskMeta.Plan.ThreadCnt),
false)
}

func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error {
Expand Down
Loading