Skip to content

Commit

Permalink
Cherry-pick commits to release-2.0. (#1050)
Browse files Browse the repository at this point in the history
* server: skip check a region if there is already a pending operator. (#1029)

* pdctl, api, schedule: pdctl supports scatter region. (#1028)

* add change log (#1024)

* scheduler: hot write scheduler randomly select the balance strategy (#1034)

* *: new store region score function for balance (#1014)

* core: adjust capacity to fit for more cases (#1035)

* check config validation (#1036)

* *: add metrics for hotspot cache (#1027)

* Refine install instructions. (#1041)

These extra steps can help new users who are perhaps not quite so familiar with Go setup and conventions.

* *: add change log for 2.0 GA (#1038)

* *: add change log for ga.

* scheduler: adjust metrics (#1042)

* fix parse error of config.toml (#1043)

* server: resign pd leader when it is not same as etcd leader. (#1039)

* server: fix the valid scheduler was deleted when start coordinator (#1045)

* server: fix the valid scheduler was deleted when start coordinator
  • Loading branch information
disksing authored May 8, 2018
1 parent 9b824d2 commit 677f04a
Show file tree
Hide file tree
Showing 46 changed files with 1,038 additions and 681 deletions.
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# PD Change Log

## v2.0.0-GA
### New Feature
* Support using pd-ctl to scatter specified Regions for manually adjusting hotspot Regions in some cases
### Improvements
* Improve configuration check rules to prevent unreasonable scheduling configuration
* Optimize the scheduling strategy when a TiKV node has insufficient space so as to prevent the disk from being fully occupied
* Optimize hot-region scheduler execution efficiency and add more metrics
* Optimize Region health check logic to avoid generating redundant schedule operators

## v2.0.0-rc.5
### New Feature
* Support adding the learner node
### Improvements
* Optimize the Balance Region Scheduler to reduce scheduling overhead
* Adjust the default value of `schedule-limit` configuration
* Fix the compatibility issue when adding a new scheduler
### Bug Fix
* Fix the issue of allocating IDs frequently

## v2.0.0-rc.4
### New Feature
* Support splitting Region manually to handle the hot spot in a single Region
### Improvement
* Optimize metrics
### Bug Fix
* Fix the issue that the label property is not displayed when `pdctl` runs `config show all`

## v2.0.0-rc3
### New Feature
* Support Region Merge, to merge empty Regions or small Regions after deleting data
### Improvements
* Ignore the nodes that have a lot of pending peers during adding replicas, to improve the speed of restoring replicas or making nodes offline
* Optimize the scheduling speed of leader balance in scenarios of unbalanced resources within different labels
* Add more statistics about abnormal Regions
### Bug Fix
* Fix the frequent scheduling issue caused by a large number of empty Regions
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ PD supports distribution and fault-tolerance by embedding [etcd](https://github.
## Build

1. Make sure [*Go*](https://golang.org/) (version 1.8+) is installed.
2. Use `make` to install PD. PD is installed in the `bin` directory.
2. Ensure your `$GOPATH` is set. (For example, `export GOPATH=$HOME/go`)
3. Clone the repository with `git clone [email protected]:pingcap/pd.git $GOPATH/src/github.com/pingcap/pd`.
4. Use `make` to install PD. PD is installed in the `bin` directory.

## Usage

Expand Down
13 changes: 2 additions & 11 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"flag"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -51,19 +49,12 @@ func main() {
case flag.ErrHelp:
os.Exit(0)
default:
log.Fatalf("parse cmd flags error: %s\n", err)
}

dataDir, err := filepath.Abs(cfg.DataDir)
logFile, err := filepath.Abs(cfg.Log.File.Filename)
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if !strings.HasPrefix(rel, "..") {
log.Fatalf("initialize logger error: log directory shouldn't be the subdirectory of data directory")
log.Fatalf("parse cmd flags error: %s\n", errors.ErrorStack(err))
}

err = logutil.InitLogger(&cfg.Log)
if err != nil {
log.Fatalf("initialize logger error: %s\n", err)
log.Fatalf("initialize logger error: %s\n", errors.ErrorStack(err))
}

server.LogPDInfo()
Expand Down
2 changes: 1 addition & 1 deletion conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
merge-schedule-limit = 8
tolerant-size-ratio = 2.5
tolerant-size-ratio = 5.0

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
Expand Down
29 changes: 29 additions & 0 deletions pdctl/command/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func NewAddOperatorCommand() *cobra.Command {
c.AddCommand(NewRemovePeerCommand())
c.AddCommand(NewMergeRegionCommand())
c.AddCommand(NewSplitRegionCommand())
c.AddCommand(NewScatterRegionCommand())
return c
}

Expand Down Expand Up @@ -286,6 +287,34 @@ func splitRegionCommandFunc(cmd *cobra.Command, args []string) {
postJSON(cmd, operatorsPrefix, input)
}

// NewScatterRegionCommand returns a command to scatter a region.
func NewScatterRegionCommand() *cobra.Command {
c := &cobra.Command{
Use: "scatter-region <region_id>",
Short: "scatter a region",
Run: scatterRegionCommandFunc,
}
return c
}

func scatterRegionCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
return
}

ids, err := parseUint64s(args)
if err != nil {
fmt.Println(err)
return
}

input := make(map[string]interface{})
input["name"] = cmd.Name()
input["region_id"] = ids[0]
postJSON(cmd, operatorsPrefix, input)
}

// NewRemoveOperatorCommand returns a command to remove operators.
func NewRemoveOperatorCommand() *cobra.Command {
c := &cobra.Command{
Expand Down
21 changes: 14 additions & 7 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,22 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = json.Unmarshal(data, &config.Schedule)
if err != nil {
if err := json.Unmarshal(data, &config.Schedule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = json.Unmarshal(data, &config.Replication)
if err != nil {
if err := json.Unmarshal(data, &config.Replication); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.svr.SetScheduleConfig(config.Schedule); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
if err := h.svr.SetReplicationConfig(config.Replication); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.svr.SetScheduleConfig(config.Schedule)
h.svr.SetReplicationConfig(config.Replication)
h.rd.JSON(w, http.StatusOK, nil)
}

Expand All @@ -76,7 +80,10 @@ func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
return
}

h.svr.SetScheduleConfig(*config)
if err := h.svr.SetScheduleConfig(*config); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
}

Expand Down
4 changes: 1 addition & 3 deletions server/api/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

maxDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
storesInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -87,7 +85,7 @@ func (h *labelsHandler) GetStores(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
storesInfo.Stores = append(storesInfo.Stores, storeInfo)
}
storesInfo.Count = len(storesInfo.Stores)
Expand Down
34 changes: 26 additions & 8 deletions server/api/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
Expand Down Expand Up @@ -211,14 +210,14 @@ func (s *testMemberAPISuite) TestLeaderResign(c *C) {
leader1, err := svrs[0].GetLeader()
c.Assert(err, IsNil)

s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", nil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/leader/resign", "")
leader2 := s.waitLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), nil)
s.post(c, addrs[leader2.GetMemberId()]+apiPrefix+"/api/v1/leader/transfer/"+leader1.GetName(), "")
leader3 := s.waitLeaderChange(c, svrs[0], leader2)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {
func (s *testMemberAPISuite) TestLeaderPriority(c *C) {
cfgs, svrs, clean := mustNewCluster(c, 3)
defer clean()

Expand All @@ -229,17 +228,24 @@ func (s *testMemberAPISuite) TestEtcdLeaderPriority(c *C) {

leader1, err := s.getEtcdLeader(svrs[0])
c.Assert(err, IsNil)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": -1}`))
s.waitLeaderSync(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": -1}`)
leader2 := s.waitEtcdLeaderChange(c, svrs[0], leader1)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), bytes.NewBufferString(`{"leader-priority": 100}`))
s.waitLeaderSync(c, svrs[0], leader2)
s.post(c, addrs[leader1.GetMemberId()]+apiPrefix+"/api/v1/members/name/"+leader1.GetName(), `{"leader-priority": 100}`)
leader3 := s.waitEtcdLeaderChange(c, svrs[0], leader2)
s.waitLeaderSync(c, svrs[0], leader3)
c.Assert(leader3.GetMemberId(), Equals, leader1.GetMemberId())
}

func (s *testMemberAPISuite) post(c *C, url string, body io.Reader) {
func (s *testMemberAPISuite) post(c *C, url string, body string) {
testutil.WaitUntil(c, func(c *C) bool {
res, err := http.Post(url, "", body)
res, err := http.Post(url, "", bytes.NewBufferString(body))
c.Assert(err, IsNil)
b, err := ioutil.ReadAll(res.Body)
res.Body.Close()
c.Assert(err, IsNil)
c.Logf("post %s, status: %v res: %s", url, res.StatusCode, string(b))
return res.StatusCode == http.StatusOK
})
}
Expand Down Expand Up @@ -288,3 +294,15 @@ func (s *testMemberAPISuite) waitEtcdLeaderChange(c *C, svr *server.Server, old
})
return leader
}

func (s *testMemberAPISuite) waitLeaderSync(c *C, svr *server.Server, etcdLeader *pdpb.Member) {
testutil.WaitUntil(c, func(c *C) bool {
leader, err := svr.GetLeader()
if err != nil {
c.Logf("GetLeader err: %v", err)
return false
}
c.Logf("leader is %v", leader.GetMemberId())
return leader.GetMemberId() == etcdLeader.GetMemberId()
})
}
10 changes: 10 additions & 0 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ func (h *operatorHandler) Post(w http.ResponseWriter, r *http.Request) {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
case "scatter-region":
regionID, ok := input["region_id"].(float64)
if !ok {
h.r.JSON(w, http.StatusBadRequest, "missing region id")
return
}
if err := h.AddScatterRegionOperator(uint64(regionID)); err != nil {
h.r.JSON(w, http.StatusInternalServerError, err.Error())
return
}
default:
h.r.JSON(w, http.StatusBadRequest, "unknown operator")
return
Expand Down
16 changes: 6 additions & 10 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
downStateName = "Down"
)

func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreInfo {
func newStoreInfo(opt *server.ScheduleConfig, store *core.StoreInfo) *StoreInfo {
s := &StoreInfo{
Store: &MetaStore{
Store: store.Store,
Expand All @@ -77,11 +77,11 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
Available: typeutil.ByteSize(store.Stats.GetAvailable()),
LeaderCount: store.LeaderCount,
LeaderWeight: store.LeaderWeight,
LeaderScore: store.LeaderScore(),
LeaderScore: store.LeaderScore(0),
LeaderSize: store.LeaderSize,
RegionCount: store.RegionCount,
RegionWeight: store.RegionWeight,
RegionScore: store.RegionScore(),
RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionSize: store.RegionSize,
SendingSnapCount: store.Stats.GetSendingSnapCount(),
ReceivingSnapCount: store.Stats.GetReceivingSnapCount(),
Expand All @@ -103,7 +103,7 @@ func newStoreInfo(store *core.StoreInfo, maxStoreDownTime time.Duration) *StoreI
}

if store.State == metapb.StoreState_Up {
if store.DownTime() > maxStoreDownTime {
if store.DownTime() > opt.MaxStoreDownTime.Duration {
s.Store.StateName = downStateName
} else if store.IsDisconnected() {
s.Store.StateName = disconnectedName
Expand Down Expand Up @@ -137,8 +137,6 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

vars := mux.Vars(r)
storeIDStr := vars["id"]
storeID, err := strconv.ParseUint(storeIDStr, 10, 64)
Expand All @@ -153,7 +151,7 @@ func (h *storeHandler) Get(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
h.rd.JSON(w, http.StatusOK, storeInfo)
}

Expand Down Expand Up @@ -324,8 +322,6 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

stores := cluster.GetStores()
StoresInfo := &StoresInfo{
Stores: make([]*StoreInfo, 0, len(stores)),
Expand All @@ -345,7 +341,7 @@ func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

storeInfo := newStoreInfo(store, maxStoreDownTime)
storeInfo := newStoreInfo(h.svr.GetScheduleConfig(), store)
StoresInfo.Stores = append(StoresInfo.Stores, storeInfo)
}
StoresInfo.Count = len(StoresInfo.Stores)
Expand Down
6 changes: 3 additions & 3 deletions server/api/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ func (s *testStoreSuite) TestDownState(c *C) {
Stats: &pdpb.StoreStats{},
LastHeartbeatTS: time.Now(),
}
storeInfo := newStoreInfo(store, time.Hour)
storeInfo := newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, metapb.StoreState_Up.String())

store.LastHeartbeatTS = time.Now().Add(-time.Minute * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, disconnectedName)

store.LastHeartbeatTS = time.Now().Add(-time.Hour * 2)
storeInfo = newStoreInfo(store, time.Hour)
storeInfo = newStoreInfo(s.svr.GetScheduleConfig(), store)
c.Assert(storeInfo.Store.StateName, Equals, downStateName)
}
4 changes: 1 addition & 3 deletions server/api/trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ func (h *trendHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

func (h *trendHandler) getTrendStores() ([]trendStore, error) {
maxStoreDownTime := h.svr.GetScheduleConfig().MaxStoreDownTime.Duration

var readStats, writeStats core.StoreHotRegionsStat
if hotRead := h.GetHotReadRegions(); hotRead != nil {
readStats = hotRead.AsLeader
Expand All @@ -123,7 +121,7 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) {

trendStores := make([]trendStore, 0, len(stores))
for _, store := range stores {
info := newStoreInfo(store, maxStoreDownTime)
info := newStoreInfo(h.svr.GetScheduleConfig(), store)
s := trendStore{
ID: info.Store.GetId(),
Address: info.Store.GetAddress(),
Expand Down
Loading

0 comments on commit 677f04a

Please sign in to comment.