Skip to content

Commit

Permalink
Merge branch 'master' into wenxuan/update-overview
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored Sep 30, 2022
2 parents ddedd26 + 8f18fce commit f103b6c
Show file tree
Hide file tree
Showing 175 changed files with 15,366 additions and 10,749 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4453,8 +4453,8 @@ def go_deps():
name = "org_uber_go_goleak",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/goleak",
sum = "h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=",
version = "v1.1.12",
sum = "h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=",
version = "v1.2.0",
)
go_repository(
name = "org_uber_go_multierr",
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Builder image
FROM golang:1.19.1 as builder
FROM golang:1.19.1-alpine as builder

RUN apk add --no-cache \
wget \
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dev: checklist check explaintest gogenerate br_unit_test test_part_parser_dev ut
# Install the check tools.
check-setup:tools/bin/revive

check: check-parallel lint tidy testSuite errdoc bazel_all_build
check: parser_yacc check-parallel lint tidy testSuite errdoc bazel_all_build

fmt:
@echo "gofmt (simplify)"
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ func TestBindCTEMerge(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int)")
require.True(t, tk.HasPlan("with cte as (select * from t1) select * from cte", "CTEFullScan"))
require.False(t, tk.HasPlan("with cte as (select /*+ MERGE() */ * from t1) select * from cte", "CTEFullScan"))
require.True(t, tk.HasPlan("with cte as (select * from t1) select * from cte a, cte b", "CTEFullScan"))
require.False(t, tk.HasPlan("with cte as (select /*+ MERGE() */ * from t1) select * from cte a, cte b", "CTEFullScan"))
tk.MustExec(`
create global binding for
with cte as (select * from t1) select * from cte
Expand Down
4 changes: 2 additions & 2 deletions bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,9 @@ func TestIssue25505(t *testing.T) {
spmMap["with recursive `cte` ( `a` ) as ( select ? union select `a` + ? from `test` . `t1` where `a` > ? ) select * from `cte`"] =
"WITH RECURSIVE `cte` (`a`) AS (SELECT 2 UNION SELECT `a` + 1 FROM `test`.`t1` WHERE `a` > 5) SELECT /*+ hash_agg(@`sel_1`), use_index(@`sel_3` `test`.`t1` `idx_b`)*/ * FROM `cte`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` > ? and `b` > ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_ab`), use_index(@`sel_3` `test`.`t2` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` > 1 AND `b` > 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_ab`), use_index(@`sel_1` `test`.`t1` `idx_ab`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
spmMap["with `cte` as ( with `cte1` as ( select * from `test` . `t2` where `a` = ? and `b` = ? ) select * from `cte1` ) select * from `cte` join `test` . `t1` on `t1` . `a` = `cte` . `a`"] =
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ inl_join(@`sel_1` `test`.`t1`), use_index(@`sel_1` `test`.`t1` `idx_a`), use_index(@`sel_3` `test`.`t2` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"
"WITH `cte` AS (WITH `cte1` AS (SELECT * FROM `test`.`t2` WHERE `a` = 1 AND `b` = 1) SELECT * FROM `cte1`) SELECT /*+ use_index(@`sel_3` `test`.`t2` `idx_a`), use_index(@`sel_1` `test`.`t1` `idx_a`)*/ * FROM `cte` JOIN `test`.`t1` ON `t1`.`a` = `cte`.`a`"

tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
tk.MustExec("with cte as (with cte1 as (select /*+use_index(t2 idx_a)*/ * from t2 where a = 1 and b = 1) select * from cte1) select /*+use_index(t1 idx_a)*/ * from cte join t1 on t1.a=cte.a;")
Expand Down
31 changes: 19 additions & 12 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,27 @@ type bindRecordUpdate struct {
// NewBindHandle creates a new BindHandle.
func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle := &BindHandle{}
handle.sctx.Context = ctx
handle.bindInfo.Value.Store(newBindCache())
handle.bindInfo.parser = parser.New()
handle.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
return handle.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
}
handle.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
handle.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
handle.Reset(ctx)
return handle
}

// Reset is to reset the BindHandle and clean old info.
func (h *BindHandle) Reset(ctx sessionctx.Context) {
h.bindInfo.Lock()
defer h.bindInfo.Unlock()
h.sctx.Context = ctx
h.bindInfo.Value.Store(newBindCache())
h.bindInfo.parser = parser.New()
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
return h.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
}
h.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
// BindSQL has already been validated when coming here, so we use nil sctx parameter.
return handle.AddBindRecord(nil, record)
return h.AddBindRecord(nil, record)
}
variable.RegisterStatistics(handle)
return handle
variable.RegisterStatistics(h)
}

// Update updates the global sql bind cache.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
flaky = True,
deps = [
":config",
"//br/pkg/lightning/common",
"//parser/mysql",
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
Expand Down
13 changes: 10 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const (
var (
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs", "gs"}

DefaultFilter = []string{
defaultFilter = []string{
"*.*",
"!mysql.*",
"!sys.*",
Expand All @@ -109,6 +109,13 @@ var (
}
)

// GetDefaultFilter gets the default table filter used in Lightning.
// It clones the original default filter,
// so that the original value won't be changed when the returned slice's element is changed.
func GetDefaultFilter() []string {
return append([]string{}, defaultFilter...)
}

type DBStore struct {
Host string `toml:"host" json:"host"`
Port int `toml:"port" json:"port"`
Expand Down Expand Up @@ -715,7 +722,7 @@ func NewConfig() *Config {
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
DataCharacterSet: defaultCSVDataCharacterSet,
DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace),
},
Expand Down Expand Up @@ -890,7 +897,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
// mydumper.filter and black-white-list cannot co-exist.
if cfg.HasLegacyBlackWhiteList() {
log.L().Warn("the config `black-white-list` has been deprecated, please replace with `mydumper.filter`")
if !common.StringSliceEqual(cfg.Mydumper.Filter, DefaultFilter) {
if !common.StringSliceEqual(cfg.Mydumper.Filter, defaultFilter) {
return common.ErrInvalidConfig.GenWithStack("`mydumper.filter` and `black-white-list` cannot be simultaneously defined")
}
}
Expand Down
43 changes: 41 additions & 2 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -750,7 +751,7 @@ func TestCronEncodeDecode(t *testing.T) {
func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
require.Equal(t, config.DefaultFilter, cfg.Mydumper.Filter)
require.Equal(t, config.GetDefaultFilter(), cfg.Mydumper.Filter)
require.False(t, cfg.HasLegacyBlackWhiteList())

ctx := context.Background()
Expand All @@ -762,7 +763,7 @@ func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg.BWList.DoDBs = []string{"test"}
require.EqualError(t, cfg.Adjust(ctx), "[Lightning:Config:ErrInvalidConfig]`mydumper.filter` and `black-white-list` cannot be simultaneously defined")

cfg.Mydumper.Filter = config.DefaultFilter
cfg.Mydumper.Filter = config.GetDefaultFilter()
require.NoError(t, cfg.Adjust(ctx))
require.True(t, cfg.HasLegacyBlackWhiteList())
}
Expand Down Expand Up @@ -955,3 +956,41 @@ func TestCheckAndAdjustForLocalBackend(t *testing.T) {
cfg.TikvImporter.SortedKVDir = base
require.NoError(t, cfg.CheckAndAdjustForLocalBackend())
}

func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
originalDefaultCfg := append([]string{}, config.GetDefaultFilter()...)
cfg1 := config.NewConfig()
require.NoError(t, cfg1.LoadFromTOML([]byte(`
[mydumper]
filter = ["db1.tbl1", "db2.*", "!db2.tbl1"]
`)))
require.Equal(t, 3, len(cfg1.Mydumper.Filter))
require.True(t, common.StringSliceEqual(
cfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

cfg2 := config.NewConfig()
require.True(t, common.StringSliceEqual(
cfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg1, err := config.LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg2, err := config.LoadGlobalConfig([]string{}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewGlobalConfig() *GlobalConfig {
LogLevel: "error",
},
Mydumper: GlobalMydumper{
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
},
TikvImporter: GlobalImporter{
Backend: "",
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ go_test(
"tsheap_test.go",
],
flaky = True,
race = "on",
shard_count = 20,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
17 changes: 0 additions & 17 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -461,22 +460,6 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
return nil
}

// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer func() {
e := recover()
if e != nil {
log.Error("panic during handing tick", zap.Stack("stack"), logutil.ShortError(err))
err = errors.Annotatef(berrors.ErrUnknown, "panic during handling tick: %s", e)
}
}()
err = c.tick(ctx)
return
}

func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error {
if s.consistencyCheckTick > 0 {
s.consistencyCheckTick--
Expand Down
82 changes: 25 additions & 57 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,48 @@ package streamhelper

import (
"context"
"time"

"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

const (
ownerPrompt = "log-backup"
ownerPath = "/tidb/br-stream/owner"
)

// AdvancerDaemon is a "high-availability" version of advancer.
// It involved the manager for electing a owner and doing things.
// You can embed it into your code by simply call:
//
// ad := NewAdvancerDaemon(adv, mgr)
// loop, err := ad.Begin(ctx)
//
// if err != nil {
// return err
// }
//
// loop()
type AdvancerDaemon struct {
adv *CheckpointAdvancer
manager owner.Manager
// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer utils.PanicToErr(&err)
return c.tick(ctx)
}

func NewAdvancerDaemon(adv *CheckpointAdvancer, manager owner.Manager) *AdvancerDaemon {
return &AdvancerDaemon{
adv: adv,
manager: manager,
}
// OnStart implements daemon.Interface.
func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.StartTaskListener(ctx)
go func() {
<-ctx.Done()
c.onStop()
}()
}

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
// Name implements daemon.Interface.
func (c *CheckpointAdvancer) Name() string {
return "LogBackup::Advancer"
}

// Begin starts the daemon.
// It would do some bootstrap task, and return a closure that would begin the main loop.
func (ad *AdvancerDaemon) Begin(ctx context.Context) (func(), error) {
log.Info("begin advancer daemon", zap.String("id", ad.manager.ID()))
if err := ad.manager.CampaignOwner(); err != nil {
return nil, err
}
func (c *CheckpointAdvancer) onStop() {
metrics.AdvancerOwner.Set(0.0)
}

ad.adv.StartTaskListener(ctx)
tick := time.NewTicker(ad.adv.cfg.TickDuration)
loop := func() {
log.Info("begin advancer daemon loop", zap.String("id", ad.manager.ID()))
for {
select {
case <-ctx.Done():
log.Info("advancer loop exits", zap.String("id", ad.manager.ID()))
return
case <-tick.C:
log.Debug("deamon tick start", zap.Bool("is-owner", ad.manager.IsOwner()))
if ad.manager.IsOwner() {
metrics.AdvancerOwner.Set(1.0)
if err := ad.adv.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
} else {
metrics.AdvancerOwner.Set(0.0)
}
}
}
}
return loop, nil
func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
}
Loading

0 comments on commit f103b6c

Please sign in to comment.