Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Dec 19, 2023
2 parents b64d0d0 + ad1efe4 commit 72d6b10
Show file tree
Hide file tree
Showing 54 changed files with 1,590 additions and 315 deletions.
36 changes: 18 additions & 18 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6109,13 +6109,13 @@ def go_deps():
name = "com_github_prometheus_prometheus",
build_file_proto_mode = "disable_global",
importpath = "github.com/prometheus/prometheus",
sha256 = "57ac0b06c05da5d42f831e52250f3bc63d2fc6785cd9f21ca79534f1900aeb19",
strip_prefix = "github.com/prometheus/[email protected].0",
sha256 = "942dba743bc78a6933cc9c2fbcc3d1d301254d3fd343975476ecd73573866f6e",
strip_prefix = "github.com/prometheus/[email protected].1",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.0.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.1.zip",
"http://ats.apps.svc/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.1.zip",
"https://cache.hawkingrei.com/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/prometheus/com_github_prometheus_prometheus-v0.48.1.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7006,26 +7006,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "04da7d520727a9140c0d472c0f0a054837aae1da3fa49101c0f52279c7d78094",
strip_prefix = "github.com/tikv/client-go/[email protected].20231204074048-e80e9ca1fe66",
sha256 = "8ff835049b1a8bf797c8d38b7081d761e985893c9b78ac9264ccca7b428fddf7",
strip_prefix = "github.com/tikv/client-go/[email protected].20231219052137-6f9ba8327b75",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231204074048-e80e9ca1fe66.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231219052137-6f9ba8327b75.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231219052137-6f9ba8327b75.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231219052137-6f9ba8327b75.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231219052137-6f9ba8327b75.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "5b31b38e151e03117ef9878c2dbac2b1f22c890e72ebb70935795ac5682c77c1",
strip_prefix = "github.com/tikv/pd/[email protected]20231213112719-f51f9134558e",
sha256 = "929a41d5e836a8ef04ed5bad9500023bd58cb17381388b9c4f1ae07ebc039287",
strip_prefix = "github.com/tikv/pd/[email protected]20231219031951-25f48f0bdd27",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231213112719-f51f9134558e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231213112719-f51f9134558e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231213112719-f51f9134558e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231213112719-f51f9134558e.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231219031951-25f48f0bdd27.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231219031951-25f48f0bdd27.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231219031951-25f48f0bdd27.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231219031951-25f48f0bdd27.zip",
],
)
go_repository(
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"iter.go",
"kv_reader.go",
"merge.go",
"merge_v2.go",
"onefile_writer.go",
"reader.go",
"split.go",
Expand All @@ -32,6 +33,7 @@ go_library(
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/util",
"//pkg/util/hack",
"//pkg/util/logutil",
"//pkg/util/size",
Expand Down Expand Up @@ -73,6 +75,7 @@ go_test(
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
Expand All @@ -91,10 +94,12 @@ go_test(
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func writeExternalOneFile(s *writeTestSuite) {
}
writer := builder.BuildOneFile(
s.store, filePath, "writerID")
_ = writer.Init(ctx, 20*1024*1024)
intest.AssertNoError(writer.Init(ctx, 20*1024*1024))
key, val, _ := s.source.next()
for key != nil {
err := writer.WriteRow(ctx, key, val)
Expand All @@ -371,8 +371,7 @@ func writeExternalOneFile(s *writeTestSuite) {
if s.beforeWriterClose != nil {
s.beforeWriterClose()
}
err := writer.Close(ctx)
intest.AssertNoError(err)
intest.AssertNoError(writer.Close(ctx))
if s.afterWriterClose != nil {
s.afterWriterClose()
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/external/byte_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (r *byteReader) readNBytes(n int) ([]byte, error) {
return bs[0], nil
}
// need to flatten bs
if n <= 0 {
return nil, errors.Errorf("illegal n (%d) when reading from external storage", n)
}
if n > int(size.GB) {
return nil, errors.Errorf("read %d bytes from external storage, exceed max limit %d", n, size.GB)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func getFilesReadConcurrency(
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
result[i] = max(result[i], 1)
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
logutil.Logger(ctx).Debug("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
Expand Down
66 changes: 3 additions & 63 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func MergeOverlappingFiles(
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
return mergeOverlappingFilesV2(
return mergeOverlappingFilesInternal(
egCtx,
files,
store,
Expand All @@ -74,68 +74,9 @@ func MergeOverlappingFiles(
return eg.Wait()
}

// unused for now.
func mergeOverlappingFilesImpl(ctx context.Context,
paths []string,
store storage.ExternalStorage,
readBufferSize int,
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
task := log.BeginTask(logutil.Logger(ctx).With(
zap.String("writer-id", writerID),
zap.Int("file-count", len(paths)),
), "merge overlapping files")
defer func() {
task.End(zap.ErrorLevel, err)
}()

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0)
if err != nil {
return err
}
defer func() {
err := iter.Close()
if err != nil {
logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err))
}
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
SetWriterBatchCount(writeBatchCount).
SetPropSizeDistance(propSizeDist).
SetPropKeysDistance(propKeysDist).
Build(store, newFilePrefix, writerID)

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.
for iter.Next() {
err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil)
if err != nil {
return err
}
}
err = iter.Error()
if err != nil {
return err
}
return writer.Close(ctx)
}

// mergeOverlappingFilesV2 reads from given files whose key range may overlap
// mergeOverlappingFilesInternal reads from given files whose key range may overlap
// and writes to one new sorted, nonoverlapping files.
func mergeOverlappingFilesV2(
func mergeOverlappingFilesInternal(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
Expand Down Expand Up @@ -177,7 +118,6 @@ func mergeOverlappingFilesV2(
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
if err != nil {
Expand Down
Loading

0 comments on commit 72d6b10

Please sign in to comment.