Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick commits to release-2.0. #1050

Merged
merged 14 commits into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# PD Change Log

## v2.0.0-GA
### New Feature
* Support using pd-ctl to scatter specified Regions for manually adjusting hotspot Regions in some cases
### Improvements
* Improve configuration check rules to prevent unreasonable scheduling configuration
* Optimize the scheduling strategy when a TiKV node has insufficient space so as to prevent the disk from being fully occupied
* Optimize hot-region scheduler execution efficiency and add more metrics
* Optimize Region health check logic to avoid generating redundant schedule operators

## v2.0.0-rc.5
### New Feature
* Support adding the learner node
### Improvements
* Optimize the Balance Region Scheduler to reduce scheduling overhead
* Adjust the default value of `schedule-limit` configuration
* Fix the compatibility issue when adding a new scheduler
### Bug Fix
* Fix the issue of allocating IDs frequently

## v2.0.0-rc.4
### New Feature
* Support splitting Region manually to handle the hot spot in a single Region
### Improvement
* Optimize metrics
### Bug Fix
* Fix the issue that the label property is not displayed when `pdctl` runs `config show all`

## v2.0.0-rc3
### New Feature
* Support Region Merge, to merge empty Regions or small Regions after deleting data
### Improvements
* Ignore the nodes that have a lot of pending peers during adding replicas, to improve the speed of restoring replicas or making nodes offline
* Optimize the scheduling speed of leader balance in scenarios of unbalanced resources within different labels
* Add more statistics about abnormal Regions
### Bug Fix
* Fix the frequent scheduling issue caused by a large number of empty Regions
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ PD supports distribution and fault-tolerance by embedding [etcd](https://github.
## Build

1. Make sure [​*Go*​](https://golang.org/) (version 1.8+) is installed.
2. Use `make` to install PD. PD is installed in the `bin` directory.
2. Ensure your `$GOPATH` is set. (For example, `export GOPATH=$HOME/go`)
3. Clone the repository with `git clone [email protected]:pingcap/pd.git $GOPATH/src/github.com/pingcap/pd`.
4. Use `make` to install PD. PD is installed in the `bin` directory.

## Usage

Expand Down
13 changes: 2 additions & 11 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"flag"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -51,19 +49,12 @@ func main() {
case flag.ErrHelp:
os.Exit(0)
default:
log.Fatalf("parse cmd flags error: %s\n", err)
}

dataDir, err := filepath.Abs(cfg.DataDir)
logFile, err := filepath.Abs(cfg.Log.File.Filename)
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if !strings.HasPrefix(rel, "..") {
log.Fatalf("initialize logger error: log directory shouldn't be the subdirectory of data directory")
log.Fatalf("parse cmd flags error: %s\n", errors.ErrorStack(err))
}

err = logutil.InitLogger(&cfg.Log)
if err != nil {
log.Fatalf("initialize logger error: %s\n", err)
log.Fatalf("initialize logger error: %s\n", errors.ErrorStack(err))
}

server.LogPDInfo()
Expand Down
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
merge-schedule-limit = 8
tolerant-size-ratio = 2.5
tolerant-size-ratio = 5.0

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Expand Down
29 changes: 29 additions & 0 deletions pdctl/command/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewAddOperatorCommand() *cobra.Command {
c.AddCommand(NewRemovePeerCommand())
c.AddCommand(NewMergeRegionCommand())
c.AddCommand(NewSplitRegionCommand())
c.AddCommand(NewScatterRegionCommand())
return c
}

Expand Down Expand Up @@ -286,6 +287,34 @@ func splitRegionCommandFunc(cmd *cobra.Command, args []string) {
postJSON(cmd, operatorsPrefix, input)
}

// NewScatterRegionCommand returns a command to scatter a region.
func NewScatterRegionCommand() *cobra.Command {
c := &cobra.Command{
Use: "scatter-region <region_id>",
Short: "scatter a region",
Run: scatterRegionCommandFunc,
}
return c
}

func scatterRegionCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
return
}

ids, err := parseUint64s(args)
if err != nil {
fmt.Println(err)
return
}

input := make(map[string]interface{})
input["name"] = cmd.Name()
input["region_id"] = ids[0]
postJSON(cmd, operatorsPrefix, input)
}

// NewRemoveOperatorCommand returns a command to remove operators.
func NewRemoveOperatorCommand() *cobra.Command {
c := &cobra.Command{
Expand Down
21 changes: 14 additions & 7 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,22 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = json.Unmarshal(data, &config.Schedule)
if err != nil {
if err := json.Unmarshal(data, &config.Schedule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = json.Unmarshal(data, &config.Replication)
if err != nil {
if err := json.Unmarshal(data, &config.Replication); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.svr.SetScheduleConfig(config.Schedule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.svr.SetReplicationConfig(config.Replication); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.svr.SetScheduleConfig(config.Schedule)
h.svr.SetReplicationConfig(config.Replication)
h.rd.JSON(w, http.StatusOK, nil)
}

Expand All @@ -76,7 +80,10 @@ func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
return
}

h.svr.SetScheduleConfig(*config)
if err := h.svr.SetScheduleConfig(*config); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

Expand Down
4 changes: 1 addition & 3 deletions server/api/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

maxDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
storesInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -87,7 +85,7 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storesInfo.Stores = append(storesInfo.Stores, storeInfo)
}
storesInfo.Count = len(storesInfo.Stores)
Expand Down
34 changes: 26 additions & 8 deletions server/api/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -211,14 +210,14 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) {
leader1, err := svrs[0].GetLeader()
c.Assert(err, IsNil)

s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", nil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", "")
leader2 := s.waitLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), nil)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), "")
leader3 := s.waitLeaderChange(c, svrs[0], leader2)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {
func (s *testMemberAPISuite) TestLeaderPriority(c *C) {
cfgs, svrs, clean := mustNewCluster(c, 3)
defer clean()

Expand All @@ -229,17 +228,24 @@ func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {

leader1, err := s.getEtcdLeader(svrs[0])
c.Assert(err, IsNil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`))
s.waitLeaderSync(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": -1}`)
leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`))
s.waitLeaderSync(c, svrs[0], leader2)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": 100}`)
leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2)
s.waitLeaderSync(c, svrs[0], leader3)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) {
func (s *testMemberAPISuite) post(c *C, url string, body string) {
testutil.WaitUntil(c, func(c *C) bool {
res, err := http.Post(url, "", body)
res, err := http.Post(url, "", bytes.NewBufferString(body))
c.Assert(err, IsNil)
b, err := ioutil.ReadAll(res.Body)
res.Body.Close()
c.Assert(err, IsNil)
c.Logf("post %s, status: %v res: %s", url, res.StatusCode, string(b))
return res.StatusCode == http.StatusOK
})
}
Expand Down Expand Up @@ -288,3 +294,15 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old
})
return leader
}

func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) {
testutil.WaitUntil(c, func(c *C) bool {
leader, err := svr.GetLeader()
if err != nil {
c.Logf("GetLeader err: %v", err)
return false
}
c.Logf("leader is %v", leader.GetMemberId())
return leader.GetMemberId() == etcdLeader.GetMemberId()
})
}
10 changes: 10 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ func (h *operatorHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "scatter-region":
regionID, ok := input["region_id"].(float64)
if !ok {
h.r.JSON(w, http.StatusBadRequest, "missing region id")
return
}
if err := h.AddScatterRegionOperator(uint64(regionID)); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
default:
h.r.JSON(w, http.StatusBadRequest, "unknown operator")
return
Expand Down
16 changes: 6 additions & 10 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
downStateName = "Down"
)

func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreInfo {
func newStoreInfo(opt *server.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
s := &StoreInfo{
Store: &MetaStore{
Store: store.Store,
Expand All @@ -77,11 +77,11 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
LeaderScore: store.LeaderScore(),
LeaderScore: store.LeaderScore(0),
LeaderSize: store.LeaderSize,
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
RegionScore: store.RegionScore(),
RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.RegionSize,
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
Expand All @@ -103,7 +103,7 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
}

if store.State == metapb.StoreState_Up {
if store.DownTime() > maxStoreDownTime {
if store.DownTime() > opt.MaxStoreDownTime.Duration {
s.Store.StateName = downStateName
} else if store.IsDisconnected() {
s.Store.StateName = disconnectedName
Expand Down Expand Up @@ -137,8 +137,6 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
Expand All @@ -153,7 +151,7 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
h.rd.JSON(w, http.StatusOK, storeInfo)
}

Expand Down Expand Up @@ -324,8 +322,6 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
StoresInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -345,7 +341,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
Expand Down
6 changes: 3 additions & 3 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ func (s *testStoreSuite) TestDownState(c *C) {
Stats: &pdpb.StoreStats{},
LastHeartbeatTS: time.Now(),
}
storeInfo := newStoreInfo(store, time.Hour)
storeInfo := newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String())

store.LastHeartbeatTS = time.Now().Add(-time.Minute * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, disconnectedName)

store.LastHeartbeatTS = time.Now().Add(-time.Hour * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, downStateName)
}
4 changes: 1 addition & 3 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

var readStats, writeStats core.StoreHotRegionsStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
Expand All @@ -123,7 +121,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {

trendStores := make([]trendStore, 0, len(stores))
for _, store := range stores {
info := newStoreInfo(store, maxStoreDownTime)
info := newStoreInfo(h.svr.GetScheduleConfig(), store)
s := trendStore{
ID: info.Store.GetId(),
Address: info.Store.GetAddress(),
Expand Down
Loading