Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix-47508
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Oct 26, 2023
2 parents 915c74d + a7366f9 commit 88b1348
Show file tree
Hide file tree
Showing 228 changed files with 11,858 additions and 8,752 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7067,13 +7067,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "e4953948d2346bf26d95fcc860b612c7c1d86f07c80a754db2551067912d37c8",
strip_prefix = "github.com/tikv/client-go/[email protected].20231010061802-07432ef6c031",
sha256 = "1b4e8e9df95d2ed7ff41d756cd00c8159f0aa9483791b50af8afebefa94e5b6c",
strip_prefix = "github.com/tikv/client-go/[email protected].20231025022411-cad314220659",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231010061802-07432ef6c031.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231010061802-07432ef6c031.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231010061802-07432ef6c031.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231010061802-07432ef6c031.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231025022411-cad314220659.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231025022411-cad314220659.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231025022411-cad314220659.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231025022411-cad314220659.zip",
],
)
go_repository(
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ else
endif

enterprise-server:
@make init-submodule
@make enterprise-prepare
@make enterprise-server-build
$(MAKE) init-submodule
$(MAKE) enterprise-prepare
$(MAKE) enterprise-server-build

server_check:
ifeq ($(TARGET), "")
Expand Down
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ reviewers:
- fixdb
- fzzf678
- iamxy
- jiyfhust
- JmPotato
- js00070
- lamxTyler
Expand Down
8 changes: 8 additions & 0 deletions OWNERS_ALIASES
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ aliases:
- lance6716
- tangenta
- wjhuang2016
sig-approvers-disttask: # approvers for disttask pkg
- Benjamin2037
- D3Hunter
- gmhdbjd
- lance6716
- tangenta
- wjhuang2016
- ywqzzy
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"engine.go",
"file.go",
"iter.go",
"kv_buf.go",
"kv_reader.go",
"merge.go",
"split.go",
Expand Down Expand Up @@ -53,13 +54,14 @@ go_test(
"engine_test.go",
"file_test.go",
"iter_test.go",
"kv_buf_test.go",
"split_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
44 changes: 11 additions & 33 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
)

// we use uint64 to store the length of key and value.
const lengthBytes = 8

// KeyValueStore stores key-value pairs and maintains the range properties.
type KeyValueStore struct {
dataWriter storage.ExternalFileWriter
Expand All @@ -46,51 +49,26 @@ func NewKeyValueStore(
return kvStore, nil
}

// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated
// addEncodedData saves encoded key-value pairs to the KeyValueStore.
// data layout: keyLen + key + valueLen + value. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
// `key` must be in strictly ascending order for invocations of a KeyValueStore.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
var (
b [8]byte
kvLen = 0
)

// data layout: keyLen + key + valueLen + value
n, err := s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(key))),
)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(s.ctx, key)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:0], uint64(len(value))),
)
if err != nil {
return err
}
kvLen += n
n, err = s.dataWriter.Write(s.ctx, value)
func (s *KeyValueStore) addEncodedData(val []byte) error {
_, err := s.dataWriter.Write(s.ctx, val)
if err != nil {
return err
}
kvLen += n

keyLen := binary.BigEndian.Uint64(val)
key := val[lengthBytes : lengthBytes+keyLen]
if len(s.rc.currProp.firstKey) == 0 {
s.rc.currProp.firstKey = key
}
s.rc.currProp.lastKey = key

s.offset += uint64(kvLen)
s.rc.currProp.size += uint64(len(key) + len(value))
s.offset += uint64(len(val))
s.rc.currProp.size += uint64(len(val) - lengthBytes*2)
s.rc.currProp.keys++

if s.rc.currProp.size >= s.rc.propSizeDist ||
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
require.Len(t, encoded, 0)

k1, v1 := []byte("key1"), []byte("value1")
err = kvStore.AddKeyValue(k1, v1)
err = kvStore.addEncodedData(getEncodedData(k1, v1))
require.NoError(t, err)
// when not accumulated enough data, no range property will be added.
require.Equal(t, &initRC, rc)

// propKeysDist = 2, so after adding 2 keys, a new range property will be added.
k2, v2 := []byte("key2"), []byte("value2")
err = kvStore.AddKeyValue(k2, v2)
err = kvStore.addEncodedData(getEncodedData(k2, v2))
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
Expand All @@ -67,7 +67,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {

// when not accumulated enough data, no range property will be added.
k3, v3 := []byte("key3"), []byte("value3")
err = kvStore.AddKeyValue(k3, v3)
err = kvStore.addEncodedData(getEncodedData(k3, v3))
require.NoError(t, err)
require.Len(t, rc.props, 1)

Expand All @@ -93,7 +93,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
rc.reset()
kvStore, err = NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
err = kvStore.AddKeyValue(k1, v1)
err = kvStore.addEncodedData(getEncodedData(k1, v1))
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
Expand All @@ -105,7 +105,7 @@ func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
}
require.Equal(t, expected, rc.props[0])

err = kvStore.AddKeyValue(k2, v2)
err = kvStore.addEncodedData(getEncodedData(k2, v2))
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestKVReadWrite(t *testing.T) {
randLen = rand.Intn(10) + 1
values[i] = make([]byte, randLen)
rand.Read(values[i])
err = kvStore.AddKeyValue(keys[i], values[i])
err = kvStore.addEncodedData(getEncodedData(keys[i], values[i]))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down
21 changes: 15 additions & 6 deletions br/pkg/lightning/backend/external/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (r *trackOpenFileReader) Close() error {
return nil
}

func getEncodedData(key, value []byte) []byte {
buf := make([]byte, 8*2+len(key)+len(value))
binary.BigEndian.PutUint64(buf, uint64(len(key)))
copy(buf[8:], key)
binary.BigEndian.PutUint64(buf[8+len(key):], uint64(len(value)))
copy(buf[8*2+len(key):], value)
return buf
}

func TestMergeKVIter(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
Expand All @@ -78,7 +87,7 @@ func TestMergeKVIter(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -130,7 +139,7 @@ func TestOneUpstream(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -208,7 +217,7 @@ func TestCorruptContent(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, kv := range data[i] {
err = kvStore.AddKeyValue([]byte(kv[0]), []byte(kv[1]))
err = kvStore.addEncodedData(getEncodedData([]byte(kv[0]), []byte(kv[1])))
require.NoError(t, err)
}
if i == 0 {
Expand Down Expand Up @@ -345,7 +354,7 @@ func TestHotspot(t *testing.T) {
kvStore, err := NewKeyValueStore(ctx, writer, rc)
require.NoError(t, err)
for _, k := range keys[i] {
err = kvStore.AddKeyValue([]byte(k), value)
err = kvStore.addEncodedData(getEncodedData([]byte(k), value))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down Expand Up @@ -446,13 +455,13 @@ func TestMemoryUsageWhenHotspotChange(t *testing.T) {
for j := 0; j < checkHotspotPeriod; j++ {
key := fmt.Sprintf("key%06d", cur)
val := fmt.Sprintf("value%06d", cur)
err = kvStore.AddKeyValue([]byte(key), []byte(val))
err = kvStore.addEncodedData(getEncodedData([]byte(key), []byte(val)))
require.NoError(t, err)
cur++
}
for j := 0; j <= 12; j++ {
key := fmt.Sprintf("key999%06d", cur+j)
err = kvStore.AddKeyValue([]byte(key), largeChunk)
err = kvStore.addEncodedData(getEncodedData([]byte(key), largeChunk))
require.NoError(t, err)
}
err = writer.Close(ctx)
Expand Down
75 changes: 75 additions & 0 deletions br/pkg/lightning/backend/external/kv_buf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import "github.com/docker/go-units"

// DefaultBlockSize is the default block size for preAllocKVBuf.
const DefaultBlockSize = 16 * units.MiB

// preAllocKVBuf pre allocates a large buffer of limit memLimit to reduce memory
// allocation, all space in this buffer will be reused when reset.
type preAllocKVBuf struct {
blocks [][]byte
blockSize int
curBlock []byte
curBlockIdx int
curIdx int
}

func newPreAllocKVBuf(memLimit uint64, blockSize int) *preAllocKVBuf {
blockCount := (memLimit + uint64(blockSize) - 1) / uint64(blockSize)
b := &preAllocKVBuf{
blocks: make([][]byte, 0, blockCount),
blockSize: blockSize,
}
for i := 0; i < int(blockCount); i++ {
b.blocks = append(b.blocks, make([]byte, blockSize))
}
b.reset()
return b
}

func (b *preAllocKVBuf) Alloc(s int) (blockIdx int32, res []byte, offset int32, allocated bool) {
if s > b.blockSize {
return
}
if b.blockSize-b.curIdx < s {
if b.curBlockIdx+1 >= len(b.blocks) {
return
}
b.curBlockIdx++
b.curBlock = b.blocks[b.curBlockIdx]
b.curIdx = 0
}
blockIdx = int32(b.curBlockIdx)
res = b.curBlock[b.curIdx : b.curIdx+s]
offset = int32(b.curIdx)
allocated = true

b.curIdx += s
return
}

func (b *preAllocKVBuf) reset() {
b.curBlockIdx = 0
b.curBlock = b.blocks[0]
b.curIdx = 0
}

func (b *preAllocKVBuf) destroy() {
b.blocks = nil
b.curBlock = nil
}
Loading

0 comments on commit 88b1348

Please sign in to comment.