Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge branch 'chunk-cp' of ssh://github.com/glorv/br into chunk-cp
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed May 12, 2021
2 parents 44c7e35 + ec3f807 commit 78dbbc2
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func NewMgr(
return nil, errors.Trace(err)
}
if checkRequirements {
err = version.CheckClusterVersion(ctx, controller.GetPDClient())
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR)
if err != nil {
return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+
"if you believe it's OK, use --check-requirements=false to skip.")
Expand Down
16 changes: 16 additions & 0 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
scheduleConfigPrefix = "pd/api/v1/config/schedule"
pauseTimeout = 5 * time.Minute

// pd request retry time when connection fail
pdRequestRetryTime = 10

// set max-pending-peer-count to a large value to avoid scatter region failed.
maxPendingPeerUnlimited uint64 = math.MaxInt32
)
Expand Down Expand Up @@ -147,6 +150,19 @@ func pdRequest(
if err != nil {
return nil, errors.Trace(err)
}
count := 0
for {
count++
if count > pdRequestRetryTime || resp.StatusCode < 500 {
break
}
resp.Body.Close()
time.Sleep(time.Second)
resp, err = cli.Do(req)
if err != nil {
return nil, errors.Trace(err)
}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
res, _ := ioutil.ReadAll(resp.Body)
Expand Down
32 changes: 32 additions & 0 deletions pkg/pdutil/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"

Expand Down Expand Up @@ -168,3 +169,34 @@ func (s *testPDControllerSuite) TestPDVersion(c *C) {
c.Assert(r.Minor, Equals, expectV.Minor)
c.Assert(r.PreRelease, Equals, expectV.PreRelease)
}

func (s *testPDControllerSuite) TestPDRequestRetry(c *C) {
ctx := context.Background()
count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= 5 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
cli := http.DefaultClient
taddr := ts.URL
_, reqErr := pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
c.Assert(reqErr, IsNil)
ts.Close()
count = 0
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
if count <= 11 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
c.Assert(reqErr, NotNil)
}
8 changes: 8 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"time"

"github.com/pingcap/br/pkg/version"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -209,6 +211,12 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}
g.Record("Size", utils.ArchiveSize(backupMeta))
backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil {
return errors.Trace(versionErr)
}
}

if err = client.InitBackupMeta(backupMeta, u); err != nil {
return errors.Trace(err)
Expand Down
110 changes: 75 additions & 35 deletions pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/pingcap/kvproto/pkg/metapb"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,12 +74,12 @@ func IsTiFlash(store *metapb.Store) bool {
return false
}

// VerChecker is a callback for the CheckClusterVersion, decides whether the cluster is suitable to execute restore.
// See also: CheckVersionForBackup and CheckVersionForBR.
type VerChecker func(store *metapb.Store, ver *semver.Version) error

// CheckClusterVersion check TiKV version.
func CheckClusterVersion(ctx context.Context, client pd.Client) error {
BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion))
if err != nil {
return errors.Annotatef(berrors.ErrVersionMismatch, "%s: invalid version, please recompile using `git fetch origin --tags && make build`", err)
}
func CheckClusterVersion(ctx context.Context, client pd.Client, checker VerChecker) error {
stores, err := client.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
Expand All @@ -99,44 +99,67 @@ func CheckClusterVersion(ctx context.Context, client pd.Client) error {
}

tikvVersionString := removeVAndHash(s.Version)
tikvVersion, err := semver.NewVersion(tikvVersionString)
if err != nil {
return errors.Annotatef(berrors.ErrVersionMismatch, "%s: TiKV node %s version %s is invalid", err, s.Address, tikvVersionString)
tikvVersion, getVersionErr := semver.NewVersion(tikvVersionString)
if getVersionErr != nil {
return errors.Annotatef(berrors.ErrVersionMismatch, "%s: TiKV node %s version %s is invalid", getVersionErr, s.Address, tikvVersionString)
}

if tikvVersion.Compare(*minTiKVVersion) < 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s don't support BR, please upgrade cluster to %s",
s.Address, tikvVersionString, build.ReleaseVersion)
if checkerErr := checker(s, tikvVersion); checkerErr != nil {
return checkerErr
}
}
return nil
}

if tikvVersion.Major != BRVersion.Major {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s major version mismatch, please use the same version of BR",
s.Address, tikvVersionString, build.ReleaseVersion)
// CheckVersionForBackup checks the version for backup and
func CheckVersionForBackup(backupVersion *semver.Version) VerChecker {
return func(store *metapb.Store, ver *semver.Version) error {
if backupVersion.Major > ver.Major {
return errors.Annotatef(berrors.ErrVersionMismatch,
"backup with cluster version %s cannot be restored at cluster of version %s: major version mismatches",
backupVersion, ver)
}
return nil
}
}

// BR(https://github.com/pingcap/br/pull/233) and TiKV(https://github.com/tikv/tikv/pull/7241) have breaking changes
// if BR include #233 and TiKV not include #7241, BR will panic TiKV during restore
// These incompatible version is 3.1.0 and 4.0.0-rc.1
if tikvVersion.Major == 3 {
if tikvVersion.Compare(*incompatibleTiKVMajor3) < 0 && BRVersion.Compare(*incompatibleTiKVMajor3) >= 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch, please use the same version of BR",
s.Address, tikvVersionString, build.ReleaseVersion)
}
}
// CheckVersionForBR checks whether version of the cluster and BR itself is compatible.
func CheckVersionForBR(s *metapb.Store, tikvVersion *semver.Version) error {
BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion))
if err != nil {
return errors.Annotatef(berrors.ErrVersionMismatch, "%s: invalid version, please recompile using `git fetch origin --tags && make build`", err)
}

if tikvVersion.Major == 4 {
if tikvVersion.Compare(*incompatibleTiKVMajor4) < 0 && BRVersion.Compare(*incompatibleTiKVMajor4) >= 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch, please use the same version of BR",
s.Address, tikvVersionString, build.ReleaseVersion)
}
if tikvVersion.Compare(*minTiKVVersion) < 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s don't support BR, please upgrade cluster to %s",
s.Address, tikvVersion, build.ReleaseVersion)
}

if tikvVersion.Major != BRVersion.Major {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s major version mismatch, please use the same version of BR",
s.Address, tikvVersion, build.ReleaseVersion)
}

// BR(https://github.com/pingcap/br/pull/233) and TiKV(https://github.com/tikv/tikv/pull/7241) have breaking changes
// if BR include #233 and TiKV not include #7241, BR will panic TiKV during restore
// These incompatible version is 3.1.0 and 4.0.0-rc.1
if tikvVersion.Major == 3 {
if tikvVersion.Compare(*incompatibleTiKVMajor3) < 0 && BRVersion.Compare(*incompatibleTiKVMajor3) >= 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch, please use the same version of BR",
s.Address, tikvVersion, build.ReleaseVersion)
}
}

// don't warn if we are the master build, which always have the version v4.0.0-beta.2-*
if build.GitBranch != "master" && tikvVersion.Compare(*BRVersion) > 0 {
log.Warn(fmt.Sprintf("BR version is outdated, please consider use version %s of BR", tikvVersionString))
break
if tikvVersion.Major == 4 {
if tikvVersion.Compare(*incompatibleTiKVMajor4) < 0 && BRVersion.Compare(*incompatibleTiKVMajor4) >= 0 {
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch, please use the same version of BR",
s.Address, tikvVersion, build.ReleaseVersion)
}
}

// don't warn if we are the master build, which always have the version v4.0.0-beta.2-*
if build.GitBranch != "master" && tikvVersion.Compare(*BRVersion) > 0 {
log.Warn(fmt.Sprintf("BR version is outdated, please consider use version %s of BR", tikvVersion))
}
return nil
}

Expand Down Expand Up @@ -199,3 +222,20 @@ func CheckTiDBVersion(versionStr string, requiredMinVersion, requiredMaxVersion
}
return CheckVersion("TiDB", *version, requiredMinVersion, requiredMaxVersion)
}

// NormalizeBackupVersion normalizes the version string from backupmeta.
func NormalizeBackupVersion(version string) *semver.Version {
// We need to unquote here because we get the version from PD HTTP API,
// which returns quoted string.
trimmedVerStr := strings.TrimSpace(version)
unquotedVerStr, err := strconv.Unquote(trimmedVerStr)
if err != nil {
unquotedVerStr = trimmedVerStr
}
normalizedVerStr := strings.TrimSpace(unquotedVerStr)
ver, err := semver.NewVersion(normalizedVerStr)
if err != nil {
log.Warn("cannot parse backup version", zap.String("version", normalizedVerStr), zap.Error(err))
}
return ver
}
Loading

0 comments on commit 78dbbc2

Please sign in to comment.