Skip to content

Commit

Permalink
Merge branch 'master' into reproduce-lock-not-found
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Dec 14, 2022
2 parents 0885d48 + bb50e33 commit ddada0a
Show file tree
Hide file tree
Showing 144 changed files with 3,483 additions and 927 deletions.
14 changes: 14 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
day: "friday"
time: "18:00"
timezone: "Asia/Shanghai"
allow:
- dependency-name: "golang.org/*"
- dependency-name: "github.com/golangci/golangci-lint"
open-pull-requests-limit: 2

105 changes: 0 additions & 105 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,108 +1388,3 @@ func TestDropBindBySQLDigest(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
}{
{
sqls: []string{
"select %s * from t1, t2 where t1.id = t2.id",
"select %s * from test.t1, t2 where t1.id = t2.id",
"select %s * from test.t1, test.t2 where t1.id = t2.id",
"select %s * from t1, test.t2 where t1.id = t2.id",
},
hint: "/*+ merge_join(t1, t2) */",
},
{
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
},
hint: "/*+ ignore_index(t, a) */",
},
}

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
stmtsummary.StmtSummaryByDigestMap.Clear()
bindSQL := fmt.Sprintf(bind, testCase.hint)
tk.MustExec(bindSQL)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}
}
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
tk.MustExec(fmt.Sprintf("drop binding for sql digest '%s'", showRes[0][9]))
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
tk.MustExec("create binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2) */ * from t1, t2 where t1.id = t2.id")
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, showRes[0][10], "") // plan digest should be nil by create for
}

func TestCreateBindingForPrepareFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, a int, key(a))")

tk.MustExec("prepare stmt from 'select /*+ ignore_index(t,a) */ * from t where a = ?'")
tk.MustExec("set @a = 1")
tk.MustExec("execute stmt using @a")
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", "select /*+ ignore_index(t,a) */ * from t where a = ? [arguments: 1]")).Rows()
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 0)
tk.MustExec(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]))
showRes = tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
tk.MustExec("execute stmt using @a")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}

func TestErrorCasesCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
tk.MustExec("create table t3(id int)")

sql := "select * from t1 where t1.id in (select id from t2)"
tk.MustExec(sql)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with sub query")

sql = "select * from t1, t2, t3 where t1.id = t2.id and t2.id = t3.id"
tk.MustExec(sql)
planDigest = tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with more than two table join")
}
2 changes: 1 addition & 1 deletion br/COMPATIBILITY_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Background

We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations.
So we need a test workflow to check the compatiblity.
So we need a test workflow to check the compatibility.

## Goal

Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/manual/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ go_library(
cgo = True,
importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
31 changes: 28 additions & 3 deletions br/pkg/lightning/manual/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,33 @@

package manual

type Allocator struct{}
import (
"fmt"

func (Allocator) Alloc(n int) []byte { return New(n) }
"go.uber.org/atomic"
)

func (Allocator) Free(b []byte) { Free(b) }
type Allocator struct {
RefCnt *atomic.Int64
}

func (a Allocator) Alloc(n int) []byte {
if a.RefCnt != nil {
a.RefCnt.Add(1)
}
return New(n)
}

func (a Allocator) Free(b []byte) {
if a.RefCnt != nil {
a.RefCnt.Add(-1)
}
Free(b)
}

func (a Allocator) CheckRefCnt() error {
if a.RefCnt != nil && a.RefCnt.Load() != 0 {
return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load())
}
return nil
}
5 changes: 2 additions & 3 deletions br/pkg/storage/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"context"
"io"
"io/ioutil"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestMemStoreBasic(t *testing.T) {
require.Nil(t, err)
r2, err := store.Open(ctx, "/hello.txt")
require.Nil(t, err)
fileContent, err = ioutil.ReadAll(r)
fileContent, err = io.ReadAll(r)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte("hello world 3"), fileContent))
require.Nil(t, r.Close())
Expand All @@ -83,7 +82,7 @@ func TestMemStoreBasic(t *testing.T) {

_, err = r2.Seek(5, io.SeekStart)
require.Nil(t, err)
fileContent, err = ioutil.ReadAll(r2)
fileContent, err = io.ReadAll(r2)
require.Nil(t, err)
require.True(t, bytes.Equal([]byte(" world 3"), fileContent))

Expand Down
87 changes: 61 additions & 26 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -285,14 +286,18 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.lastCheckpoint = e.Info.StartTs
log.Info("added event", zap.Stringer("task", e.Info), zap.Stringer("ranges", logutil.StringifyKeys(c.taskRange)))
case EventDel:
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
c.subscriber.Clear()
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand All @@ -303,31 +308,34 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
return nil
}

func (c *CheckpointAdvancer) setCheckpoint(cp uint64) bool {
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
return false
}
if cp <= c.lastCheckpoint {
return false
}
c.lastCheckpoint = cp
return true
}

// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}
log.Info("get checkpoint", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
if cp <= c.lastCheckpoint {
return nil
}

log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
if c.setCheckpoint(cp) {
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
return nil
}

Expand Down Expand Up @@ -375,16 +383,17 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(c.checkpoints.MinValue())
c.checkpointsMu.Unlock()
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
return nil
}

func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
log.Warn("[log backup advancer] Subscriber meet error, would polling the checkpoint.", logutil.ShortError(err))
Expand All @@ -397,6 +406,32 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
if err != nil {
return err
}

return nil
}

func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}

var errs error

cx, cancel := context.WithTimeout(ctx, c.Config().TickTimeout())
defer cancel()
err := c.optionalTick(cx)
if err != nil {
log.Warn("[log backup advancer] option tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

err = c.importantTick(ctx)
if err != nil {
log.Warn("[log backup advancer] important tick failed.", logutil.ShortError(err))
errs = multierr.Append(errs, err)
}

return errs
}
Loading

0 comments on commit ddada0a

Please sign in to comment.