Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: make original HTTP API compatible with config manager #2080

Merged
merged 8 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pelletier/go-toml v1.3.0 h1:e5+lF2E4Y2WCIxBefVowBuB0iHrUH4HZ8q+6mGF7fJc=
github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g=
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8=
Expand Down Expand Up @@ -333,7 +334,6 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -404,7 +404,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
Expand Down
5 changes: 5 additions & 0 deletions pkg/typeutil/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ func (d *Duration) UnmarshalText(text []byte) error {
d.Duration, err = time.ParseDuration(string(text))
return errors.WithStack(err)
}

// MarshalText returns the duration as a JSON string.
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.String()), nil
}
2 changes: 1 addition & 1 deletion pkg/typeutil/string_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *StringSlice) UnmarshalJSON(text []byte) error {
return errors.WithStack(err)
}
if len(data) == 0 {
*s = nil
*s = []string{}
return nil
}
*s = strings.Split(data, ",")
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/string_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *testStringSliceSuite) TestJSON(c *C) {
}

func (s *testStringSliceSuite) TestEmpty(c *C) {
var ss StringSlice
ss := StringSlice([]string{})
b, err := json.Marshal(ss)
c.Assert(err, IsNil)
c.Assert(string(b), Equals, "\"\"")
Expand Down
131 changes: 131 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"reflect"
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/apiutil"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/config"
"github.com/pkg/errors"
"github.com/unrolled/render"
"go.uber.org/zap"
)

type confHandler struct {
Expand All @@ -46,6 +50,27 @@ func (h *confHandler) Get(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetConfig()
data, err := ioutil.ReadAll(r.Body)
r.Body.Close()
Expand Down Expand Up @@ -139,6 +164,27 @@ func (h *confHandler) GetSchedule(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) SetSchedule(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetScheduleConfig()
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil {
return
Expand All @@ -156,6 +202,27 @@ func (h *confHandler) GetReplication(w http.ResponseWriter, r *http.Request) {
}

func (h *confHandler) SetReplication(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
m := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&m)
entries, err := transToEntries(m)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
err = redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
config := h.svr.GetReplicationConfig()
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &config); err != nil {
return
Expand All @@ -177,6 +244,53 @@ func (h *confHandler) SetLabelProperty(w http.ResponseWriter, r *http.Request) {
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
typ := input["type"]
labelKey, labelValue := input["label-key"], input["label-value"]
cfg := h.svr.GetScheduleOption().LoadLabelPropertyConfig().Clone()
switch input["action"] {
case "set":
for _, l := range cfg[typ] {
if l.Key == labelKey && l.Value == labelValue {
return
}
}
cfg[typ] = append(cfg[typ], config.StoreLabel{Key: labelKey, Value: labelValue})
case "delete":
oldLabels := cfg[typ]
cfg[typ] = []config.StoreLabel{}
for _, l := range oldLabels {
if l.Key == labelKey && l.Value == labelValue {
continue
}
cfg[typ] = append(cfg[typ], l)
}
if len(cfg[typ]) == 0 {
delete(cfg, typ)
}
default:
err := errors.Errorf("unknown action %v", input["action"])
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var buf bytes.Buffer
toml.NewEncoder(&buf).Encode(cfg)
entries := []*entry{{key: "label-property", value: buf.String()}}
err := redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}

var err error
switch input["action"] {
case "set":
Expand Down Expand Up @@ -207,6 +321,23 @@ func (h *confHandler) SetClusterVersion(w http.ResponseWriter, r *http.Request)
apiutil.ErrorResp(h.rd, w, errcode.NewInvalidInputErr(errors.New("not set cluster-version")))
return
}

if h.svr.GetConfig().EnableConfigManager {
kind := &configpb.ConfigKind{Kind: &configpb.ConfigKind_Global{Global: &configpb.Global{Component: server.Component}}}
v := &configpb.Version{Global: h.svr.GetConfigManager().GlobalCfgs[server.Component].GetVersion()}
entry := &configpb.ConfigEntry{Name: "cluster-version", Value: version}
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
_, _, err := h.svr.GetConfigClient().Update(h.svr.Context(), v, kind, []*configpb.ConfigEntry{entry})
if err != nil {
log.Error("update cluster version meet error", zap.Error(err))
}
h.rd.JSON(w, http.StatusOK, nil)
return
}

err := h.svr.SetClusterVersion(version)
if err != nil {
apiutil.ErrorResp(h.rd, w, errcode.NewInternalErr(err))
Expand Down
12 changes: 11 additions & 1 deletion server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ type testConfigSuite struct {
}

func (s *testConfigSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.EnableConfigManager = true })
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testConfigSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -71,6 +74,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
newCfg := &config.Config{}
err = readJSON(addr, newCfg)
c.Assert(err, IsNil)
Expand All @@ -92,6 +96,7 @@ func (s *testConfigSuite) TestConfigSchedule(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
sc1 := &config.ScheduleConfig{}
c.Assert(readJSON(addr, sc1), IsNil)
c.Assert(*sc, DeepEquals, *sc1)
Expand All @@ -118,6 +123,7 @@ func (s *testConfigSuite) TestConfigReplication(c *C) {
err = postJSON(addr, postData)
c.Assert(err, IsNil)

time.Sleep(20 * time.Millisecond)
rc3 := &config.ReplicationConfig{}
err = readJSON(addr, rc3)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -146,7 +152,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) {
for _, cmd := range cmds {
err := postJSON(addr, []byte(cmd))
c.Assert(err, IsNil)
time.Sleep(20 * time.Millisecond)
}

cfg = loadProperties()
c.Assert(cfg, HasLen, 2)
c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{
Expand All @@ -162,7 +170,9 @@ func (s *testConfigSuite) TestConfigLabelProperty(c *C) {
for _, cmd := range cmds {
err := postJSON(addr, []byte(cmd))
c.Assert(err, IsNil)
time.Sleep(20 * time.Millisecond)
}

cfg = loadProperties()
c.Assert(cfg, HasLen, 1)
c.Assert(cfg["foo"], DeepEquals, []config.StoreLabel{{Key: "zone", Value: "cn2"}})
Expand Down
14 changes: 13 additions & 1 deletion server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -107,7 +108,11 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) {
},
}

s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) { cfg.Replication.StrictlyMatchLabel = false })
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) {
cfg.Replication.StrictlyMatchLabel = false
cfg.EnableConfigManager = true
})
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
Expand All @@ -117,6 +122,8 @@ func (s *testLabelsStoreSuite) SetUpSuite(c *C) {
for _, store := range s.stores {
mustPutStore(c, s.svr, store.Id, store.State, store.Labels)
}
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testLabelsStoreSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -183,16 +190,20 @@ type testStrictlyLabelsStoreSuite struct {
}

func (s *testStrictlyLabelsStoreSuite) SetUpSuite(c *C) {
server.ConfigCheckInterval = 10 * time.Millisecond
s.svr, s.cleanup = mustNewServer(c, func(cfg *config.Config) {
cfg.Replication.LocationLabels = []string{"zone", "disk"}
cfg.Replication.StrictlyMatchLabel = true
cfg.EnableConfigManager = true
})
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix)

mustBootstrapCluster(c, s.svr)
// make sure the config client is initialized
time.Sleep(20 * time.Millisecond)
}

func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {
Expand Down Expand Up @@ -277,6 +288,7 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) {

// enable placement rules. Report no error any more.
c.Assert(postJSON(fmt.Sprintf("%s/config", s.urlPrefix), []byte(`{"enable-placement-rules":"true"}`)), IsNil)
time.Sleep(20 * time.Millisecond)
for _, t := range cases {
_, err := s.svr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()},
Expand Down
18 changes: 18 additions & 0 deletions server/api/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

Expand All @@ -37,6 +38,23 @@ func newlogHandler(svr *server.Server, rd *render.Render) *logHandler {
}

func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) {
if h.svr.GetConfig().EnableConfigManager {
client := h.svr.GetConfigClient()
if client == nil {
h.rd.JSON(w, http.StatusServiceUnavailable, "no leader")
}
cm := h.svr.GetConfigManager()
var str string
json.NewDecoder(r.Body).Decode(&str)
entries := []*entry{{key: "log.level", value: fmt.Sprintf("level = \"%v\"", str)}}
err := redirectUpdateReq(h.svr.Context(), client, cm, entries)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, nil)
return
}
var level string
data, err := ioutil.ReadAll(r.Body)
r.Body.Close()
Expand Down
Loading