Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support_replayer_…
Browse files Browse the repository at this point in the history
…task
  • Loading branch information
Yisaer committed Nov 10, 2022
2 parents eb0bad4 + 066e9c8 commit c445b51
Show file tree
Hide file tree
Showing 123 changed files with 3,794 additions and 1,244 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3495,8 +3495,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:x5L32xr5KE2C0UhlKfrsDTWQ/8fpFYQLq5xn+ph0sQ0=",
version = "v2.0.3-0.20221103084528-ec1202a9d26f",
sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
version = "v2.0.3-0.20221108030801-9c0835c80eba",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_2.json --@io_bazel_rules_go//go/config:cover_format=go_cover \
--build_event_json_file=bazel_2.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

Expand Down
2 changes: 2 additions & 0 deletions autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/autoid_service",
visibility = ["//visibility:public"],
deps = [
"//config",
"//kv",
"//meta",
"//metrics",
Expand All @@ -16,6 +17,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/autoid",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_uber_go_zap//:zap",
],
)
21 changes: 18 additions & 3 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package autoid

import (
"context"
"crypto/tls"
"math"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
Expand All @@ -31,6 +33,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

var (
Expand Down Expand Up @@ -245,10 +248,22 @@ type Service struct {
}

// New return a Service instance.
func New(selfAddr string, etcdAddr []string, store kv.Storage) *Service {
func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Config) *Service {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()
cli, err := clientv3.New(clientv3.Config{
Endpoints: etcdAddr,
DialTimeout: time.Second,
LogConfig: &etcdLogCfg,
Endpoints: etcdAddr,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
},
TLS: tlsConfig,
})
if err != nil {
panic(err)
Expand Down
24 changes: 14 additions & 10 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func (h *BindHandle) Reset(ctx sessionctx.Context) {
h.bindInfo.parser = parser.New()
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
return h.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
_, err := h.DropBindRecord(record.OriginalSQL, record.Db, &record.Bindings[0])
return err
}
h.pendingVerifyBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.pendingVerifyBindRecordMap.flushFunc = func(record *BindRecord) error {
Expand Down Expand Up @@ -368,7 +369,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}

// DropBindRecord drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (err error) {
func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (deletedRows uint64, err error) {
db = strings.ToLower(db)
h.bindInfo.Lock()
h.sctx.Lock()
Expand All @@ -380,9 +381,8 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
if err != nil {
return err
return 0, err
}
var deleteRows int
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(ctx, "ROLLBACK")
Expand All @@ -391,7 +391,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
}

_, err = exec.ExecuteInternal(ctx, "COMMIT")
if err != nil || deleteRows == 0 {
if err != nil || deletedRows == 0 {
return
}

Expand All @@ -404,7 +404,7 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
return 0, err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()
Expand All @@ -416,9 +416,11 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
_, err = exec.ExecuteInternal(ctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? and status != %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL, deleted)
}
if err != nil {
return 0, err
}

deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
return h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows(), nil
}

// SetBindRecordStatus set a BindRecord's status to the storage and bind cache.
Expand Down Expand Up @@ -1185,7 +1187,8 @@ func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context, adminEvolve b
// since it is still in the bind record. Now we just drop it and if it is actually retryable,
// we will hope for that we can capture this evolve task again.
if err != nil {
return h.DropBindRecord(originalSQL, db, &binding)
_, err = h.DropBindRecord(originalSQL, db, &binding)
return err
}
// If the accepted plan timeouts, it is hard to decide the timeout for verify plan.
// Currently we simply mark the verify plan as `using` if it could run successfully within maxTime.
Expand All @@ -1195,7 +1198,8 @@ func (h *BindHandle) HandleEvolvePlanTask(sctx sessionctx.Context, adminEvolve b
sctx.GetSessionVars().UsePlanBaselines = false
verifyPlanTime, err := h.getRunningDuration(sctx, db, binding.BindSQL, maxTime)
if err != nil {
return h.DropBindRecord(originalSQL, db, &binding)
_, err = h.DropBindRecord(originalSQL, db, &binding)
return err
}
if verifyPlanTime == -1 || (float64(verifyPlanTime)*acceptFactor > float64(currentPlanTime)) {
binding.Status = Rejected
Expand Down
1 change: 1 addition & 0 deletions bindinfo/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func TestGlobalBinding(t *testing.T) {
require.NotNil(t, bind.UpdateTime)

_, err = tk.Exec("drop global " + testSQL.dropSQL)
require.Equal(t, uint64(1), tk.Session().AffectedRows())
require.NoError(t, err)
bindData = dom.BindHandle().GetBindRecord(hash, sql, "test")
require.Nil(t, bindData)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ go_test(
"//testkit/testsetup",
"//types",
"//util/codec",
"//util/mathutil",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
Expand Down
11 changes: 9 additions & 2 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,19 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.LoadUntil(ctx, loc, c.untilTS)
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
for _, m := range meta.metadata {
for path := range meta.metadataInfos {
data, err := loc.ReadFile(ctx, path)
require.NoError(t, err)

m, err := meta.Helper.ParseToMetadataHard(data)
require.NoError(t, err)

metas = append(metas, m)
}

actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
Expand Down
Loading

0 comments on commit c445b51

Please sign in to comment.