Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7054
Browse files Browse the repository at this point in the history
close tikv#7053

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Sep 11, 2023
1 parent 41f8422 commit 8b1e141
Show file tree
Hide file tree
Showing 5 changed files with 707 additions and 20 deletions.
127 changes: 127 additions & 0 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
)

type minResolvedTSHandler struct {
svr *server.Server
rd *render.Render
}

func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler {
return &minResolvedTSHandler{
svr: svr,
rd: rd,
}
}

// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type minResolvedTS struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
PersistInterval typeutil.Duration `json:"persist_interval,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}

// @Tags min_store_resolved_ts
// @Summary Get store-level min resolved ts.
// @Produce json
// @Success 200 {array} minResolvedTS
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := getCluster(r)
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
value := c.GetStoreMinResolvedTS(storeID)
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval
h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: value,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
})
}

// @Tags min_resolved_ts
// @Summary Get cluster-level min resolved ts and optionally store-level min resolved ts.
// @Description Optionally, we support a query parameter `scope`
// to get store-level min resolved ts by specifying a list of store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
//
// @Produce json
// @Param scope query string false "Scope of the min resolved ts: comma-separated list of store IDs (e.g., '1,2,3')." default(cluster)
// @Success 200 {array} minResolvedTS
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := getCluster(r)
scopeMinResolvedTS := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval

var storesMinResolvedTS map[uint64]uint64
if scopeStr := r.URL.Query().Get("scope"); len(scopeStr) > 0 {
// scope is an optional parameter, it can be `cluster` or specified store IDs.
// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil.
// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled.
// - When scope given a list of stores, min_resolved_ts will be provided for each store
// and the scope-specific min_resolved_ts will be returned.
if scopeStr == "cluster" {
stores := c.GetMetaStores()
ids := make([]uint64, len(stores))
for i, store := range stores {
ids[i] = store.GetId()
}
// use cluster-level min_resolved_ts as the scope-specific min_resolved_ts.
_, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
} else {
scopeIDs := strings.Split(scopeStr, ",")
ids := make([]uint64, len(scopeIDs))
for i, idStr := range scopeIDs {
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
ids[i] = id
}
scopeMinResolvedTS, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids)
}
}

h.rd.JSON(w, http.StatusOK, minResolvedTS{
MinResolvedTS: scopeMinResolvedTS,
PersistInterval: persistInterval,
IsRealTime: persistInterval.Duration != 0,
StoresMinResolvedTS: storesMinResolvedTS,
})
}
53 changes: 52 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,6 @@ func (s *Server) ReportBatchSplit(ctx context.Context, request *pdpb.ReportBatch
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
Expand Down Expand Up @@ -1551,6 +1550,58 @@ func (s *Server) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsReq
}, nil
}

<<<<<<< HEAD
=======
// SplitAndScatterRegions split regions by the given split keys, and scatter regions.
// Only regions which splited successfully will be scattered.
// scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered.
func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) {
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
return pdpb.NewPDClient(client).SplitAndScatterRegions(ctx, request)
}
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err
} else if rsp != nil {
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()), false)
if err != nil {
return nil, err
}
return &pdpb.SplitAndScatterRegionsResponse{
Header: s.header(),
RegionsId: newRegionIDs,
SplitFinishedPercentage: uint64(splitFinishedPercentage),
ScatterFinishedPercentage: uint64(scatterFinishedPercentage),
}, nil
}

// scatterRegions add operators to scatter regions and return the processed percentage and error
func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, error) {
opsCount, failures, err := cluster.GetRegionScatter().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit)
if err != nil {
return 0, err
}
percentage := 100
if len(failures) > 0 {
percentage = 100 - 100*len(failures)/(opsCount+len(failures))
log.Debug("scatter regions", zap.Errors("failures", func() []error {
r := make([]error, 0, len(failures))
for _, err := range failures {
r = append(r, err)
}
return r
}()))
}
return percentage, nil
}

>>>>>>> d03f485c9 (*: check raftcluster nil (#7054))
// GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager.
func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) {
var err error
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,14 +884,20 @@ func (h *Handler) ResetTS(ts uint64) error {

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// PluginLoad loads the plugin referenced by the pluginPath
Expand Down
Loading

0 comments on commit 8b1e141

Please sign in to comment.