Skip to content

Commit

Permalink
Merge branch 'master' into memory-tuner-1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Nov 28, 2022
2 parents 83cc0b6 + 0e6364f commit 1e4ee9b
Show file tree
Hide file tree
Showing 132 changed files with 13,441 additions and 9,795 deletions.
52 changes: 30 additions & 22 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,8 @@ def go_deps():
name = "com_github_dgraph_io_ristretto",
build_file_proto_mode = "disable_global",
importpath = "github.com/dgraph-io/ristretto",
sum = "h1:Wrc3UKTS+cffkOx0xRGFC+ZesNuTfn0ThvEC72N0krk=",
version = "v0.1.1-0.20220403145359-8e850b710d6d",
sum = "h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=",
version = "v0.1.1",
)
go_repository(
name = "com_github_dgrijalva_jwt_go",
Expand Down Expand Up @@ -2880,8 +2880,8 @@ def go_deps():
name = "com_github_pingcap_check",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/check",
sum = "h1:iRtOAQ6FXkY/BGvst3CDfTva4nTqh6CL8WXvanLdbu0=",
version = "v0.0.0-20191107115940-caf2b9e6ccf4",
sum = "h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=",
version = "v0.0.0-20200212061837-5e12011dc712",
)
go_repository(
name = "com_github_pingcap_errors",
Expand Down Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
version = "v0.0.0-20221114102356-3debb6820e46",
sum = "h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=",
version = "v0.0.0-20221123043343-cdc67325f05f",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3375,8 +3375,8 @@ def go_deps():
name = "com_github_spaolacci_murmur3",
build_file_proto_mode = "disable_global",
importpath = "github.com/spaolacci/murmur3",
sum = "h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=",
version = "v0.0.0-20180118202830-f09979ecbc72",
sum = "h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=",
version = "v1.1.0",
)
go_repository(
name = "com_github_spf13_afero",
Expand Down Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
version = "v2.0.3-0.20221108030801-9c0835c80eba",
sum = "h1:5FFJAKukKDTsLqrEeeDgC89aDAteGEFXBHwKRa3wnnQ=",
version = "v2.0.3-0.20221125022819-f05c6886bbad",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down Expand Up @@ -3861,6 +3861,14 @@ def go_deps():
sum = "h1:b1zWmYuuHz7gO9kDcM/EpHGr06UgsYNRpNJzI2kFiLM=",
version = "v1.5.0",
)
go_repository(
name = "com_google_cloud_go_compute_metadata",
build_file_proto_mode = "disable",
importpath = "cloud.google.com/go/compute/metadata",
sum = "h1:nBbNSZyDpkNlo3DepaaLKVuO7ClyifSAmNloSCZrHnQ=",
version = "v0.2.0",
)

go_repository(
name = "com_google_cloud_go_datastore",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -4406,22 +4414,22 @@ def go_deps():
name = "org_golang_x_mod",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/mod",
sum = "h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=",
version = "v0.6.0",
sum = "h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=",
version = "v0.7.0",
)
go_repository(
name = "org_golang_x_net",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/net",
sum = "h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=",
version = "v0.1.0",
sum = "h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE=",
version = "v0.0.0-20220411215720-9780585627b5",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_sync",
Expand All @@ -4434,15 +4442,15 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=",
version = "v0.1.0",
sum = "h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sum = "h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw=",
version = "v0.1.0",
sum = "h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_text",
Expand All @@ -4455,8 +4463,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=",
version = "v0.1.0",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "099a9fb96a376ccbbb7d291ed4ecbdfd42f6bc822ab77ae6f1b5cb9e914e94fa",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.35.0/rules_go-v0.35.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.35.0/rules_go-v0.35.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
],
)

Expand Down
1 change: 1 addition & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string,
}
ctx, cancelFunc := context.WithCancel(ctx)
timer := time.NewTimer(maxTime)
defer timer.Stop()
resultChan := make(chan error)
startTime := time.Now()
go runSQL(ctx, sctx, sql, resultChan)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ go_test(
name = "kv_test",
timeout = "short",
srcs = [
"session_internal_test.go",
"session_test.go",
"sql2kv_test.go",
],
embed = [":kv"],
flaky = True,
race = "on",
deps = [
":kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
Expand All @@ -69,6 +70,7 @@ go_test(
"//tablecodec",
"//types",
"//util/mock",
"@com_github_docker_go_units//:go-units",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
24 changes: 22 additions & 2 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

const maxAvailableBufSize int = 20

// invalidIterator is a trimmed down Iterator type which is invalid.
type invalidIterator struct {
kv.Iterator
Expand Down Expand Up @@ -92,15 +94,33 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
mb.Lock()
if len(mb.availableBufs) >= maxAvailableBufSize {
// too many byte buffers, evict one byte buffer and continue
evictedByteBuf := mb.availableBufs[0]
evictedByteBuf.destroy()
mb.availableBufs = mb.availableBufs[1:]
}
mb.availableBufs = append(mb.availableBufs, buf)
mb.Unlock()
}

func (mb *kvMemBuf) AllocateBuf(size int) {
mb.Lock()
size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size {
mb.buf = mb.availableBufs[0]
var (
existingBuf *bytesBuf
existingBufIdx int
)
for i, buf := range mb.availableBufs {
if buf.cap >= size {
existingBuf = buf
existingBufIdx = i
break
}
}
if existingBuf != nil {
mb.buf = existingBuf
mb.availableBufs[existingBufIdx] = mb.availableBufs[0]
mb.availableBufs = mb.availableBufs[1:]
} else {
mb.buf = newBytesBuf(size)
Expand Down
126 changes: 126 additions & 0 deletions br/pkg/lightning/backend/kv/session_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/docker/go-units"
"github.com/stretchr/testify/require"
)

func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
for _, tc := range []testCase{
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
4 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16]
FinalAvailableByteBufCaps: []int{
4 * units.MiB,
2 * units.MiB,
8 * units.MiB,
16 * units.MiB,
},
},
{
AllocSizes: []int{
5 * units.MiB,
4 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
},
// [16] => [16] => [16] => [16] => [16]
FinalAvailableByteBufCaps: []int{16 * units.MiB},
},
{
AllocSizes: []int{5, 4, 3, 2, 1},
// [1] => [1] => [1] => [1] => [1]
FinalAvailableByteBufCaps: []int{1 * units.MiB},
},
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16]
FinalAvailableByteBufCaps: []int{
8 * units.MiB,
4 * units.MiB,
2 * units.MiB,
16 * units.MiB,
},
},
} {
testKVMemBuf := &kvMemBuf{}
for _, allocSize := range tc.AllocSizes {
testKVMemBuf.AllocateBuf(allocSize)
testKVMemBuf.Recycle(testKVMemBuf.buf)
}
require.Equal(t, len(tc.FinalAvailableByteBufCaps), len(testKVMemBuf.availableBufs))
for i, bb := range testKVMemBuf.availableBufs {
require.Equal(t, tc.FinalAvailableByteBufCaps[i], bb.cap)
}
}
}

func TestKVMemBufBatchAllocAndRecycle(t *testing.T) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
testKVMemBuf := &kvMemBuf{}
bBufs := []*bytesBuf{}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(2 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
for _, bb := range testKVMemBuf.availableBufs {
require.Equal(t, 4*units.MiB, bb.cap)
}
bBufs = bBufs[:0]
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bb := testKVMemBuf.buf
require.Equal(t, 4*units.MiB, bb.cap)
bBufs = append(bBufs, bb)
require.Equal(t, maxAvailableBufSize-i-1, len(testKVMemBuf.availableBufs))
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func newGCTTLManager(pdClient pd.Client) gcTTLManager {

func (m *gcTTLManager) addOneJob(ctx context.Context, table string, ts uint64) error {
// start gc ttl loop if not started yet.
if m.started.CAS(false, true) {
if m.started.CompareAndSwap(false, true) {
m.start(ctx)
}
m.lock.Lock()
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ func (rc *Controller) fullCompact(ctx context.Context) error {

// wait until any existing level-1 compact to complete first.
task := log.FromContext(ctx).Begin(zap.InfoLevel, "wait for completion of existing level 1 compaction")
for !rc.compactState.CAS(compactStateIdle, compactStateDoing) {
for !rc.compactState.CompareAndSwap(compactStateIdle, compactStateDoing) {
time.Sleep(100 * time.Millisecond)
}
task.End(zap.ErrorLevel, nil)
Expand Down Expand Up @@ -1901,7 +1901,7 @@ func (rc *Controller) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode)
}

func (rc *Controller) enforceDiskQuota(ctx context.Context) {
if !rc.diskQuotaState.CAS(diskQuotaStateIdle, diskQuotaStateChecking) {
if !rc.diskQuotaState.CompareAndSwap(diskQuotaStateIdle, diskQuotaStateChecking) {
// do not run multiple the disk quota check / import simultaneously.
// (we execute the lock check in background to avoid blocking the cron thread)
return
Expand Down
Loading

0 comments on commit 1e4ee9b

Please sign in to comment.