Skip to content

Commit

Permalink
Merge branch 'master' into more_log_build_batch_cop
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Feb 7, 2023
2 parents 73d9b89 + 52e5a7b commit 360196d
Show file tree
Hide file tree
Showing 22 changed files with 347 additions and 790 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ build:race --@io_bazel_rules_go//go/config:race --test_env=GORACE=halt_on_error=

test --test_env=TZ=Asia/Shanghai
test --test_output=errors --test_summary=testcase
test:ci --color=yes
test:ci --color=yes --spawn_strategy=local
test:ci --verbose_failures --test_verbose_timeout_warnings
test:ci --test_env=GO_TEST_WRAP_TESTV=1
test:ci --experimental_ui_max_stdouterr_bytes=104857600
Expand Down
84 changes: 50 additions & 34 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ def go_deps():
name = "com_github_census_instrumentation_opencensus_proto",
build_file_proto_mode = "disable_global",
importpath = "github.com/census-instrumentation/opencensus-proto",
sum = "h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=",
version = "v0.3.0",
sum = "h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=",
version = "v0.2.1",
)
go_repository(
name = "com_github_certifi_gocertifi",
Expand Down Expand Up @@ -707,8 +707,8 @@ def go_deps():
name = "com_github_coreos_go_systemd",
build_file_proto_mode = "disable_global",
importpath = "github.com/coreos/go-systemd",
sum = "h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=",
version = "v0.0.0-20190719114852-fd7a80b32e1f",
sum = "h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=",
version = "v0.0.0-20190321100706-95778dfbb74e",
)
go_repository(
name = "com_github_coreos_go_systemd_v22",
Expand Down Expand Up @@ -1092,8 +1092,8 @@ def go_deps():
name = "com_github_fogleman_gg",
build_file_proto_mode = "disable_global",
importpath = "github.com/fogleman/gg",
sum = "h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=",
version = "v1.3.0",
sum = "h1:WXb3TSNmHp2vHoCroCIB1foO/yQ36swABL8aOVeDpgg=",
version = "v1.2.1-0.20190220221249-0403632d5b90",
)
go_repository(
name = "com_github_form3tech_oss_jwt_go",
Expand Down Expand Up @@ -1192,15 +1192,15 @@ def go_deps():
name = "com_github_gin_contrib_sse",
build_file_proto_mode = "disable_global",
importpath = "github.com/gin-contrib/sse",
sum = "h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=",
version = "v0.1.0",
sum = "h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g=",
version = "v0.0.0-20190301062529-5545eab6dad3",
)
go_repository(
name = "com_github_gin_gonic_gin",
build_file_proto_mode = "disable_global",
importpath = "github.com/gin-gonic/gin",
sum = "h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8=",
version = "v1.8.1",
sum = "h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ=",
version = "v1.4.0",
)
go_repository(
name = "com_github_go_check_check",
Expand Down Expand Up @@ -3344,8 +3344,8 @@ def go_deps():
name = "com_github_pingcap_failpoint",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/failpoint",
sum = "h1:kJolJWbyadVeL8RKBlqmXQR7FRKPsIeU85TUYyhbhiQ=",
version = "v0.0.0-20220423142525-ae43b7f4e5c3",
sum = "h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ=",
version = "v0.0.0-20220801062533-2eaa32854a6c",
)
go_repository(
name = "com_github_pingcap_fn",
Expand All @@ -3365,8 +3365,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=",
version = "v0.0.0-20230201112839-2b853bed8125",
sum = "h1:QcC52K9hhsP6eVmQBnSMI/b8TiOUVztbaeduTXspmeQ=",
version = "v0.0.0-20230206112125-0561adc37543",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3938,8 +3938,8 @@ def go_deps():
name = "com_github_stackexchange_wmi",
build_file_proto_mode = "disable",
importpath = "github.com/StackExchange/wmi",
sum = "h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=",
version = "v0.0.0-20190523213315-cbe66965904d",
sum = "h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=",
version = "v0.0.0-20180725035823-b12b22c5341f",
)

go_repository(
Expand Down Expand Up @@ -4085,8 +4085,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:j2s6Gechj46t1GWxE0vZEPBqgp7sc7mb3v0srBmf5Tw=",
version = "v2.0.5-0.20230202101145-8fd09cd88cce",
sum = "h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260=",
version = "v2.0.6-0.20230207040004-9b3ecc1dcaa9",
)
go_repository(
name = "com_github_tikv_pd",
Expand All @@ -4100,8 +4100,8 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:xHPPej9Z8IrYdyLED4byOGtGm/7yhxyRUf4m93CpDyg=",
version = "v0.0.0-20230202094356-18df271ce57f",
sum = "h1:lzk+XYHs5iJ1lIxoza4Na3vLl/Z+y/qhpbkLS34WxaE=",
version = "v0.0.0-20230206191557-2a7c8d4c9676",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down Expand Up @@ -4194,8 +4194,8 @@ def go_deps():
name = "com_github_ugorji_go_codec",
build_file_proto_mode = "disable_global",
importpath = "github.com/ugorji/go/codec",
sum = "h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=",
version = "v1.2.7",
sum = "h1:3SVOIvH7Ae1KRYyQWRjXWJEA9sS/c/pjvH++55Gr648=",
version = "v0.0.0-20181204163529-d75b2dcb6bc8",
)
go_repository(
name = "com_github_ultraware_funlen",
Expand Down Expand Up @@ -4873,8 +4873,8 @@ def go_deps():
name = "com_google_cloud_go_iam",
build_file_proto_mode = "disable_global",
importpath = "cloud.google.com/go/iam",
sum = "h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs=",
version = "v0.7.0",
sum = "h1:E2osAkZzxI/+8pZcxVLcDtAQx/u+hZXVryUaYQ5O0Kk=",
version = "v0.8.0",
)
go_repository(
name = "com_google_cloud_go_iap",
Expand Down Expand Up @@ -4939,6 +4939,14 @@ def go_deps():
sum = "h1:3Kdajn6X25yWQFhFCErmKSYTSvkEd3chJROny//F1A0=",
version = "v1.4.0",
)
go_repository(
name = "com_google_cloud_go_maps",
build_file_proto_mode = "disable",
importpath = "cloud.google.com/go/maps",
sum = "h1:kLReRbclTgJefw2fcCbdLPLhPj0U6UUWN10ldG8sdOU=",
version = "v0.1.0",
)

go_repository(
name = "com_google_cloud_go_mediatranslation",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5291,6 +5299,14 @@ def go_deps():
sum = "h1:A2Tl2ZmwMRpvEmhV2ibISY85fmQR+Y5w9a0PlRz5P3s=",
version = "v1.3.0",
)
go_repository(
name = "com_google_cloud_go_vmwareengine",
build_file_proto_mode = "disable",
importpath = "cloud.google.com/go/vmwareengine",
sum = "h1:JMPZaOT/gIUxVlTqSl/QQ32Y2k+r0stNeM1NSqhVP9o=",
version = "v0.1.0",
)

go_repository(
name = "com_google_cloud_go_vpcaccess",
build_file_proto_mode = "disable",
Expand Down Expand Up @@ -5460,8 +5476,8 @@ def go_deps():
name = "in_gopkg_natefinch_lumberjack_v2",
build_file_proto_mode = "disable_global",
importpath = "gopkg.in/natefinch/lumberjack.v2",
sum = "h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=",
version = "v2.0.0",
sum = "h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=",
version = "v2.2.1",
)
go_repository(
name = "in_gopkg_resty_v1",
Expand Down Expand Up @@ -5814,15 +5830,15 @@ def go_deps():
name = "org_golang_google_genproto",
build_file_proto_mode = "disable_global",
importpath = "google.golang.org/genproto",
sum = "h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks=",
version = "v0.0.0-20221202195650-67e5cbc046fd",
sum = "h1:vArvWooPH749rNHpBGgVl+U9B9dATjiEhJzcWGlovNs=",
version = "v0.0.0-20230202175211-008b39050e57",
)
go_repository(
name = "org_golang_google_grpc",
build_file_proto_mode = "disable_global",
importpath = "google.golang.org/grpc",
sum = "h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=",
version = "v1.51.0",
sum = "h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ=",
version = "v1.52.3",
)
go_repository(
name = "org_golang_google_grpc_cmd_protoc_gen_go_grpc",
Expand Down Expand Up @@ -5864,8 +5880,8 @@ def go_deps():
name = "org_golang_x_image",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/image",
sum = "h1:5h3ngYt7+vXCDZCup/HkCQgW5XwmSvR/nA2JmJ0RErg=",
version = "v0.0.0-20200119044424-58c23975cae1",
sum = "h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4=",
version = "v0.0.0-20190802002840-cff245a6509b",
)
go_repository(
name = "org_golang_x_lint",
Expand Down Expand Up @@ -5913,8 +5929,8 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=",
version = "v0.4.0",
sum = "h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=",
version = "v0.5.0",
)
go_repository(
name = "org_golang_x_term",
Expand Down
39 changes: 35 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -99,6 +98,16 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {
minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope)
// Inject mocked SafeTS for test.
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
minSafeTS = uint64(injectTS)
})
return oracle.GetTimeFromTS(minSafeTS)
}

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
Expand All @@ -111,12 +120,34 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS)
if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
if oracle.GetTimeFromTS(flashBackTS).After(expression.GetMinSafeTime(sctx)) {
return errors.Errorf("cannot set flashback timestamp to too close to present time")

flashbackGetMinSafeTimeTimeout := time.Minute
failpoint.Inject("changeFlashbackGetMinSafeTimeTimeout", func(val failpoint.Value) {
t := val.(int)
flashbackGetMinSafeTimeTimeout = time.Duration(t)
})

start := time.Now()
minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for oracleFlashbackTS.After(minSafeTime) {
if time.Since(start) >= flashbackGetMinSafeTimeTimeout {
return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime)
}
select {
case <-ticker.C:
minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore())
break
case <-ctx.Done():
return ctx.Err()
}
}

gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
Expand Down
28 changes: 8 additions & 20 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

oldValue := map[string]interface{}{
Expand Down Expand Up @@ -111,8 +109,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
require.EqualValues(t, finishValue["merge-schedule-limit"], 1)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestAddDDLDuringFlashback(t *testing.T) {
Expand All @@ -128,9 +125,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand All @@ -150,8 +145,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestGlobalVariablesOnFlashback(t *testing.T) {
Expand All @@ -167,9 +161,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand Down Expand Up @@ -232,8 +224,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestCancelFlashbackCluster(t *testing.T) {
Expand All @@ -247,9 +238,7 @@ func TestCancelFlashbackCluster(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand Down Expand Up @@ -284,6 +273,5 @@ func TestCancelFlashbackCluster(t *testing.T) {
dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
7 changes: 2 additions & 5 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,7 @@ func TestTiFlashFlashbackCluster(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s")
Expand All @@ -473,8 +471,7 @@ func TestTiFlashFlashbackCluster(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) {
Expand Down
Loading

0 comments on commit 360196d

Please sign in to comment.