Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some fixes to release 3.0 #1612

Merged
merged 4 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# PD Change Log

## Unreleased

+ Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552)
+ Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596)
+ Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601)

## v3.0.0

+ Support re-creating a cluster from a single node
Expand Down
51 changes: 51 additions & 0 deletions server/api/etcd_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"encoding/json"
"net/http"
"strings"

. "github.com/pingcap/check"
)

var _ = Suite(&testEtcdAPISuite{})

type testEtcdAPISuite struct {
hc *http.Client
}

func (s *testEtcdAPISuite) SetUpSuite(c *C) {
s.hc = newHTTPClient()
}

func (s *testEtcdAPISuite) TestGRPCGateway(c *C) {
svr, clean := mustNewServer(c)
defer clean()

addr := svr.GetConfig().ClientUrls + "/v3/kv/put"
putKey := map[string]string{"key": "Zm9v", "value": "YmFy"}
v, _ := json.Marshal(putKey)
err := postJSON(addr, v)
c.Assert(err, IsNil)
addr = svr.GetConfig().ClientUrls + "/v3/kv/range"
getKey := map[string]string{"key": "Zm9v"}
v, _ = json.Marshal(getKey)
err = postJSON(addr, v, func(res []byte) bool {
c.Assert(strings.Contains(string(res), "Zm9v"), IsTrue)
return true
})
c.Assert(err, IsNil)
}
11 changes: 9 additions & 2 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,26 @@ func readJSON(r io.ReadCloser, data interface{}) error {
return nil
}

func postJSON(url string, data []byte) error {
func postJSON(url string, data []byte, checkOpts ...func(res []byte) bool) error {
resp, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return errors.WithStack(err)
}
res, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
defer resp.Body.Close()

if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
return errors.New(string(res))
}
for _, opt := range checkOpts {
if !opt(res) {
return errors.New("check failed")
}
}
return nil
}

Expand Down
12 changes: 9 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type Config struct {
AdvertiseClientUrls string `toml:"advertise-client-urls" json:"advertise-client-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`

Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
ForceNewCluster bool `json:"force-new-cluster"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
ForceNewCluster bool `json:"force-new-cluster"`
EnableGRPCGateway bool `json:"enable-grpc-gateway"`

InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Expand Down Expand Up @@ -199,6 +200,7 @@ const (

defaultUseRegionStorage = true
defaultStrictlyMatchLabel = false
defaultEnableGRPCGateway = true
)

func adjustString(v *string, defValue string) {
Expand Down Expand Up @@ -427,6 +429,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
if !configMetaData.IsDefined("enable-prevote") {
c.PreVote = true
}
if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = defaultEnableGRPCGateway
}
return nil
}

Expand Down Expand Up @@ -872,6 +877,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg.PeerTLSInfo.KeyFile = c.Security.KeyPath
cfg.ForceNewCluster = c.ForceNewCluster
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(c.logger, c.logger.Core(), c.logProps.Syncer)
cfg.EnableGRPCGateway = c.EnableGRPCGateway
cfg.Logger = "zap"
var err error

Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,9 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
opt.HotRegionScheduleLimit = mockoption.NewScheduleOptions().HotRegionScheduleLimit
opt.RegionScheduleLimit = 0
c.Assert(hb.Schedule(tc), HasLen, 1)
// Always produce operator
c.Assert(hb.Schedule(tc), HasLen, 1)
c.Assert(hb.Schedule(tc), HasLen, 1)

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
Expand Down
14 changes: 4 additions & 10 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,13 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster)
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}

func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit())
return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit())
}

func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator {
Expand Down Expand Up @@ -443,7 +436,8 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt

avgRegionCount := hotRegionTotalCount / float64(len(storesStat))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
return uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
return maxUint64(limit, 1)
}

func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotRegionInfos {
Expand Down
22 changes: 21 additions & 1 deletion server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,35 @@ const (
)

// ScheduleOptions is an interface to access configurations.
// TODO: merge the Options to schedule.Options
type ScheduleOptions interface {
GetLocationLabels() []string
GetMaxStoreDownTime() time.Duration

GetLowSpaceRatio() float64
GetHighSpaceRatio() float64
GetTolerantSizeRatio() float64
GetStoreBalanceRate() float64

GetSchedulerMaxWaitingOperator() uint64
GetLeaderScheduleLimit(name string) uint64
GetRegionScheduleLimit(name string) uint64
GetReplicaScheduleLimit(name string) uint64
GetMergeScheduleLimit(name string) uint64
GetHotRegionScheduleLimit(name string) uint64
GetMaxReplicas(name string) int
GetHotRegionCacheHitsThreshold() int
GetMaxSnapshotCount() uint64
GetMaxPendingPeerCount() uint64
GetMaxMergeRegionSize() uint64
GetMaxMergeRegionKeys() uint64

IsRaftLearnerEnabled() bool
IsMakeUpReplicaEnabled() bool
IsRemoveExtraReplicaEnabled() bool
IsRemoveDownReplicaEnabled() bool
IsReplaceOfflineReplicaEnabled() bool

GetMaxStoreDownTime() time.Duration
}

type storeStatistics struct {
Expand Down Expand Up @@ -160,6 +173,13 @@ func (s *storeStatistics) Collect() {
configs["high_space_ratio"] = float64(s.opt.GetHighSpaceRatio())
configs["low_space_ratio"] = float64(s.opt.GetLowSpaceRatio())
configs["tolerant_size_ratio"] = float64(s.opt.GetTolerantSizeRatio())
configs["store-balance-rate"] = float64(s.opt.GetStoreBalanceRate())
configs["hot-region-schedule-limit"] = float64(s.opt.GetHotRegionScheduleLimit(s.namespace))
configs["hot-region-cache-hits-threshold"] = float64(s.opt.GetHotRegionCacheHitsThreshold())
configs["max-pending-peer-count"] = float64(s.opt.GetMaxPendingPeerCount())
configs["max-snapshot-count"] = float64(s.opt.GetMaxSnapshotCount())
configs["max-merge-region-size"] = float64(s.opt.GetMaxMergeRegionSize())
configs["max-merge-region-keys"] = float64(s.opt.GetMaxMergeRegionKeys())

var disableMakeUpReplica, disableLearner, disableRemoveDownReplica, disableRemoveExtraReplica, disableReplaceOfflineReplica float64
if !s.opt.IsMakeUpReplicaEnabled() {
Expand Down