Skip to content

Commit

Permalink
Merge branch 'master' of http://github.com/tikv/pd into rw-pending
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 committed Sep 17, 2022
2 parents 1cdd39b + 1218b4d commit d180cc0
Show file tree
Hide file tree
Showing 20 changed files with 754 additions and 163 deletions.
17 changes: 16 additions & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,26 @@ error = '''
stop dashboard failed
'''

["PD:diagnostic:ErrDiagnosticLoadPlanError"]
["PD:diagnostic:ErrDiagnosticDisabled"]
error = '''
diagnostic is disabled
'''

["PD:diagnostic:ErrDiagnosticLoadPlan"]
error = '''
load plan failed
'''

["PD:diagnostic:ErrNoDiagnosticResult"]
error = '''
%v has no diagnostic result
'''

["PD:diagnostic:ErrSchedulerUndiagnosable"]
error = '''
%v hasn't supported diagnostic
'''

["PD:dir:ErrReadDirName"]
error = '''
read dir name error
Expand Down
7 changes: 5 additions & 2 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ var (
ErrCheckerMergeAgain = errors.Normalize("region will be merged again, %s", errors.RFCCodeText("PD:checker:ErrCheckerMergeAgain"))
)

// diagnose errors
// diagnostic errors
var (
ErrDiagnosticLoadPlanError = errors.Normalize("load plan failed", errors.RFCCodeText("PD:diagnostic:ErrDiagnosticLoadPlanError"))
ErrDiagnosticDisabled = errors.Normalize("diagnostic is disabled", errors.RFCCodeText("PD:diagnostic:ErrDiagnosticDisabled"))
ErrSchedulerUndiagnosable = errors.Normalize("%v hasn't supported diagnostic", errors.RFCCodeText("PD:diagnostic:ErrSchedulerUndiagnosable"))
ErrNoDiagnosticResult = errors.Normalize("%v has no diagnostic result", errors.RFCCodeText("PD:diagnostic:ErrNoDiagnosticResult"))
ErrDiagnosticLoadPlan = errors.Normalize("load plan failed", errors.RFCCodeText("PD:diagnostic:ErrDiagnosticLoadPlan"))
)

// placement errors
Expand Down
52 changes: 52 additions & 0 deletions server/api/diagnostic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"

"github.com/gorilla/mux"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/cluster"
"github.com/unrolled/render"
)

type diagnosticHandler struct {
svr *server.Server
rd *render.Render
}

func newDiagnosticHandler(svr *server.Server, rd *render.Render) *diagnosticHandler {
return &diagnosticHandler{
svr: svr,
rd: rd,
}
}

func (h *diagnosticHandler) GetDiagnosticResult(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
if _, ok := cluster.DiagnosableSummaryFunc[name]; !ok {
h.rd.JSON(w, http.StatusBadRequest, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name).Error())
return
}
rc := getCluster(r)
result, err := rc.GetCoordinator().GetDiagnosticResult(name)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, result)
}
145 changes: 145 additions & 0 deletions server/api/diagnostic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/apiutil"
tu "github.com/tikv/pd/pkg/testutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedulers"
)

type diagnosticTestSuite struct {
suite.Suite
svr *server.Server
cleanup cleanUpFunc
urlPrefix string
configPrefix string
schedulerPrifex string
}

func TestDiagnosticTestSuite(t *testing.T) {
suite.Run(t, new(diagnosticTestSuite))
}

func (suite *diagnosticTestSuite) SetupSuite() {
re := suite.Require()
suite.svr, suite.cleanup = mustNewServer(re)
server.MustWaitLeader(re, []*server.Server{suite.svr})

addr := suite.svr.GetAddr()
suite.urlPrefix = fmt.Sprintf("%s%s/api/v1/schedulers/diagnostic", addr, apiPrefix)
suite.schedulerPrifex = fmt.Sprintf("%s%s/api/v1/schedulers", addr, apiPrefix)
suite.configPrefix = fmt.Sprintf("%s%s/api/v1/config", addr, apiPrefix)

mustBootstrapCluster(re, suite.svr)
mustPutStore(re, suite.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil)
}

func (suite *diagnosticTestSuite) TearDownSuite() {
suite.cleanup()
}

func (suite *diagnosticTestSuite) TestSchedulerDiagnosticAPI() {
re := suite.Require()
addr := suite.configPrefix
cfg := &config.Config{}
err := tu.ReadGetJSON(re, testDialClient, addr, cfg)
suite.NoError(err)

suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, cfg))
suite.False(cfg.Schedule.EnableDiagnostic)

ms := map[string]interface{}{
"enable-diagnostic": "true",
"max-replicas": 1,
}
postData, err := json.Marshal(ms)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)))
cfg = &config.Config{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, cfg))
suite.True(cfg.Schedule.EnableDiagnostic)

balanceRegionURL := suite.urlPrefix + "/" + schedulers.BalanceRegionName
result := &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("disabled", result.Status)

evictLeaderURL := suite.urlPrefix + "/" + schedulers.EvictLeaderName
suite.NoError(tu.CheckGetJSON(testDialClient, evictLeaderURL, nil, tu.StatusNotOK(re)))

input := make(map[string]interface{})
input["name"] = schedulers.BalanceRegionName
body, err := json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, suite.schedulerPrifex, body, tu.StatusOK(suite.Require()))
suite.NoError(err)

time.Sleep(time.Millisecond * 50)
result = &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("pending", result.Status)

input = make(map[string]interface{})
input["delay"] = 30
pauseArgs, err := json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, suite.schedulerPrifex+"/"+schedulers.BalanceRegionName, pauseArgs, tu.StatusOK(re))
suite.NoError(err)
time.Sleep(time.Millisecond * 50)
result = &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("paused", result.Status)

input["delay"] = 0
pauseArgs, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, suite.schedulerPrifex+"/"+schedulers.BalanceRegionName, pauseArgs, tu.StatusOK(re))
suite.NoError(err)
time.Sleep(time.Millisecond * 50)
result = &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("pending", result.Status)

mustPutRegion(re, suite.svr, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60))
time.Sleep(time.Millisecond * 50)
result = &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("normal", result.Status)

deleteURL := fmt.Sprintf("%s/%s", suite.schedulerPrifex, schedulers.BalanceRegionName)
_, err = apiutil.DoDelete(testDialClient, deleteURL)
suite.NoError(err)
result = &cluster.DiagnosticResult{}
err = tu.ReadGetJSON(re, testDialClient, balanceRegionURL, result)
suite.NoError(err)
suite.Equal("disabled", result.Status)
}
18 changes: 18 additions & 0 deletions server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,24 @@ func (suite *transferRegionOperatorTestSuite) TestTransferRegionWithPlacementRul
}
}

func mustPutRegion(re *require.Assertions, svr *server.Server, regionID, storeID uint64, start, end []byte, opts ...core.RegionCreateOption) *core.RegionInfo {
leader := &metapb.Peer{
Id: regionID,
StoreId: storeID,
}
metaRegion := &metapb.Region{
Id: regionID,
StartKey: start,
EndKey: end,
Peers: []*metapb.Peer{leader},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
r := core.NewRegionInfo(metaRegion, leader, opts...)
err := svr.GetRaftCluster().HandleRegionHeartbeat(r)
re.NoError(err)
return r
}

func mustPutStore(re *require.Assertions, svr *server.Server, id uint64, state metapb.StoreState, nodeState metapb.NodeState, labels []*metapb.StoreLabel) {
s := &server.GrpcServer{Server: svr}
_, err := s.PutStore(context.Background(), &pdpb.PutStoreRequest{
Expand Down
3 changes: 3 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(apiRouter, "/schedulers/{name}", schedulerHandler.DeleteScheduler, setMethods(http.MethodDelete))
registerFunc(apiRouter, "/schedulers/{name}", schedulerHandler.PauseOrResumeScheduler, setMethods(http.MethodPost))

diagnosticHandler := newDiagnosticHandler(svr, rd)
registerFunc(clusterRouter, "/schedulers/diagnostic/{name}", diagnosticHandler.GetDiagnosticResult, setMethods(http.MethodGet))

schedulerConfigHandler := newSchedulerConfigHandler(svr, rd)
registerPrefix(apiRouter, "/scheduler-config", schedulerConfigHandler.GetSchedulerConfig)

Expand Down
Loading

0 comments on commit d180cc0

Please sign in to comment.