-
Notifications
You must be signed in to change notification settings - Fork 726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mcs: support forwarding split and scatter request #7190
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,13 +140,7 @@ | |
|
||
c := s.GetCluster() | ||
if c == nil { | ||
resp := &schedulingpb.RegionHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ | ||
ClusterId: s.clusterID, | ||
Error: &schedulingpb.Error{ | ||
Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, | ||
Message: "scheduling server is not initialized yet", | ||
}, | ||
}} | ||
resp := &schedulingpb.RegionHeartbeatResponse{Header: s.notBootstrappedHeader()} | ||
err := server.Send(resp) | ||
return errors.WithStack(err) | ||
} | ||
|
@@ -177,7 +171,7 @@ | |
if c == nil { | ||
// TODO: add metrics | ||
log.Info("cluster isn't initialized") | ||
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil | ||
return &schedulingpb.StoreHeartbeatResponse{Header: s.notBootstrappedHeader()}, nil | ||
} | ||
|
||
if c.GetStore(request.GetStats().GetStoreId()) == nil { | ||
|
@@ -191,6 +185,75 @@ | |
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil | ||
} | ||
|
||
// SplitRegions split regions by the given split keys | ||
func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitRegionsRequest) (*schedulingpb.SplitRegionsResponse, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, we seem to need to support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need AskSplit/AskBatchSplit. But GetOperator is necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, you are right. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do it in the next PR |
||
c := s.GetCluster() | ||
if c == nil { | ||
return &schedulingpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil | ||
} | ||
finishedPercentage, newRegionIDs := c.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) | ||
return &schedulingpb.SplitRegionsResponse{ | ||
Header: s.header(), | ||
RegionsId: newRegionIDs, | ||
FinishedPercentage: uint64(finishedPercentage), | ||
}, nil | ||
} | ||
|
||
// ScatterRegions implements gRPC PDServer. | ||
func (s *Service) ScatterRegions(ctx context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) { | ||
c := s.GetCluster() | ||
if c == nil { | ||
return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil | ||
} | ||
|
||
opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
percentage := 100 | ||
if len(failures) > 0 { | ||
percentage = 100 - 100*len(failures)/(opsCount+len(failures)) | ||
log.Debug("scatter regions", zap.Errors("failures", func() []error { | ||
r := make([]error, 0, len(failures)) | ||
for _, err := range failures { | ||
r = append(r, err) | ||
} | ||
return r | ||
}())) | ||
} | ||
return &schedulingpb.ScatterRegionsResponse{ | ||
Header: s.header(), | ||
FinishedPercentage: uint64(percentage), | ||
}, nil | ||
} | ||
|
||
// GetOperator gets information about the operator belonging to the specify region. | ||
func (s *Service) GetOperator(ctx context.Context, request *schedulingpb.GetOperatorRequest) (*schedulingpb.GetOperatorResponse, error) { | ||
c := s.GetCluster() | ||
if c == nil { | ||
return &schedulingpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil | ||
} | ||
|
||
opController := c.GetCoordinator().GetOperatorController() | ||
requestID := request.GetRegionId() | ||
r := opController.GetOperatorStatus(requestID) | ||
if r == nil { | ||
header := s.errorHeader(&schedulingpb.Error{ | ||
Type: schedulingpb.ErrorType_UNKNOWN, | ||
Message: "Not Found", | ||
}) | ||
return &schedulingpb.GetOperatorResponse{Header: header}, nil | ||
} | ||
|
||
return &schedulingpb.GetOperatorResponse{ | ||
Header: s.header(), | ||
RegionId: requestID, | ||
Desc: []byte(r.Desc()), | ||
Kind: []byte(r.Kind().String()), | ||
Status: r.Status, | ||
}, nil | ||
} | ||
|
||
// RegisterGRPCService registers the service to gRPC server. | ||
func (s *Service) RegisterGRPCService(g *grpc.Server) { | ||
schedulingpb.RegisterSchedulingServer(g, s) | ||
|
@@ -201,3 +264,29 @@ | |
handler, group := SetUpRestHandler(s) | ||
apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) | ||
} | ||
|
||
func (s *Service) errorHeader(err *schedulingpb.Error) *schedulingpb.ResponseHeader { | ||
return &schedulingpb.ResponseHeader{ | ||
ClusterId: s.clusterID, | ||
Error: err, | ||
} | ||
} | ||
|
||
func (s *Service) notBootstrappedHeader() *schedulingpb.ResponseHeader { | ||
return s.errorHeader(&schedulingpb.Error{ | ||
Type: schedulingpb.ErrorType_NOT_BOOTSTRAPPED, | ||
Message: "cluster is not initialized", | ||
}) | ||
} | ||
|
||
func (s *Service) header() *schedulingpb.ResponseHeader { | ||
if s.clusterID == 0 { | ||
return s.wrapErrorToHeader(schedulingpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") | ||
} | ||
return &schedulingpb.ResponseHeader{ClusterId: s.clusterID} | ||
} | ||
|
||
func (s *Service) wrapErrorToHeader( | ||
errorType schedulingpb.ErrorType, message string) *schedulingpb.ResponseHeader { | ||
return s.errorHeader(&schedulingpb.Error{Type: errorType, Message: message}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1770,6 +1770,35 @@ | |
|
||
// ScatterRegion implements gRPC PDServer. | ||
func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionRequest) (*pdpb.ScatterRegionResponse, error) { | ||
if s.IsAPIServiceMode() { | ||
s.updateSchedulingClient(ctx) | ||
if s.schedulingClient.Load() != nil { | ||
regionsID := request.GetRegionsId() | ||
if len(regionsID) == 0 { | ||
return &pdpb.ScatterRegionResponse{ | ||
Header: s.invalidValue("regions id is required"), | ||
}, nil | ||
} | ||
req := &schedulingpb.ScatterRegionsRequest{ | ||
Header: &schedulingpb.RequestHeader{ | ||
ClusterId: request.GetHeader().GetClusterId(), | ||
SenderId: request.GetHeader().GetSenderId(), | ||
}, | ||
RegionsId: regionsID, | ||
Group: request.GetGroup(), | ||
RetryLimit: request.GetRetryLimit(), | ||
SkipStoreLimit: request.GetSkipStoreLimit(), | ||
} | ||
resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().ScatterRegions(ctx, req) | ||
if err != nil { | ||
// reset to let it be updated in the next request | ||
s.schedulingClient.Store(&schedulingClient{}) | ||
return s.convertScatterResponse(resp), err | ||
} | ||
return s.convertScatterResponse(resp), nil | ||
} | ||
} | ||
|
||
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { | ||
return pdpb.NewPDClient(client).ScatterRegion(ctx, request) | ||
} | ||
|
@@ -1960,6 +1989,25 @@ | |
|
||
// GetOperator gets information about the operator belonging to the specify region. | ||
func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { | ||
if s.IsAPIServiceMode() { | ||
s.updateSchedulingClient(ctx) | ||
if s.schedulingClient.Load() != nil { | ||
req := &schedulingpb.GetOperatorRequest{ | ||
Header: &schedulingpb.RequestHeader{ | ||
ClusterId: request.GetHeader().GetClusterId(), | ||
SenderId: request.GetHeader().GetSenderId(), | ||
}, | ||
RegionId: request.GetRegionId(), | ||
} | ||
resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().GetOperator(ctx, req) | ||
if err != nil { | ||
// reset to let it be updated in the next request | ||
s.schedulingClient.Store(&schedulingClient{}) | ||
return s.convertOperatorResponse(resp), err | ||
} | ||
return s.convertOperatorResponse(resp), nil | ||
} | ||
} | ||
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { | ||
return pdpb.NewPDClient(client).GetOperator(ctx, request) | ||
} | ||
|
@@ -2049,6 +2097,45 @@ | |
}) | ||
} | ||
|
||
func (s *GrpcServer) convertHeader(header *schedulingpb.ResponseHeader) *pdpb.ResponseHeader { | ||
switch header.GetError().GetType() { | ||
case schedulingpb.ErrorType_UNKNOWN: | ||
return &pdpb.ResponseHeader{ | ||
ClusterId: header.GetClusterId(), | ||
Error: &pdpb.Error{ | ||
Type: pdpb.ErrorType_UNKNOWN, | ||
Message: header.GetError().GetMessage(), | ||
}, | ||
} | ||
default: | ||
return &pdpb.ResponseHeader{ClusterId: header.GetClusterId()} | ||
} | ||
} | ||
|
||
func (s *GrpcServer) convertSplitResponse(resp *schedulingpb.SplitRegionsResponse) *pdpb.SplitRegionsResponse { | ||
return &pdpb.SplitRegionsResponse{ | ||
Header: s.convertHeader(resp.GetHeader()), | ||
FinishedPercentage: resp.GetFinishedPercentage(), | ||
} | ||
} | ||
|
||
func (s *GrpcServer) convertScatterResponse(resp *schedulingpb.ScatterRegionsResponse) *pdpb.ScatterRegionResponse { | ||
return &pdpb.ScatterRegionResponse{ | ||
Header: s.convertHeader(resp.GetHeader()), | ||
FinishedPercentage: resp.GetFinishedPercentage(), | ||
} | ||
} | ||
|
||
func (s *GrpcServer) convertOperatorResponse(resp *schedulingpb.GetOperatorResponse) *pdpb.GetOperatorResponse { | ||
return &pdpb.GetOperatorResponse{ | ||
Header: s.convertHeader(resp.GetHeader()), | ||
RegionId: resp.GetRegionId(), | ||
Desc: resp.GetDesc(), | ||
Kind: resp.GetKind(), | ||
Status: resp.GetStatus(), | ||
} | ||
} | ||
|
||
// Only used for the TestLocalAllocatorLeaderChange. | ||
var mockLocalAllocatorLeaderChangeFlag = false | ||
|
||
|
@@ -2153,6 +2240,27 @@ | |
|
||
// SplitRegions split regions by the given split keys | ||
func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsRequest) (*pdpb.SplitRegionsResponse, error) { | ||
if s.IsAPIServiceMode() { | ||
s.updateSchedulingClient(ctx) | ||
if s.schedulingClient.Load() != nil { | ||
req := &schedulingpb.SplitRegionsRequest{ | ||
Header: &schedulingpb.RequestHeader{ | ||
ClusterId: request.GetHeader().GetClusterId(), | ||
SenderId: request.GetHeader().GetSenderId(), | ||
}, | ||
SplitKeys: request.GetSplitKeys(), | ||
RetryLimit: request.GetRetryLimit(), | ||
} | ||
resp, err := s.schedulingClient.Load().(*schedulingClient).getClient().SplitRegions(ctx, req) | ||
if err != nil { | ||
// reset to let it be updated in the next request | ||
s.schedulingClient.Store(&schedulingClient{}) | ||
return s.convertSplitResponse(resp), err | ||
} | ||
return s.convertSplitResponse(resp), nil | ||
} | ||
} | ||
|
||
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { | ||
return pdpb.NewPDClient(client).SplitRegions(ctx, request) | ||
} | ||
|
@@ -2175,7 +2283,7 @@ | |
} | ||
|
||
// SplitAndScatterRegions split regions by the given split keys, and scatter regions. | ||
// Only regions which splited successfully will be scattered. | ||
// Only regions which split successfully will be scattered. | ||
// scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered. | ||
func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to forward this functions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK, this API is not used for now. |
||
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add some tests about forward requests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have added a TODO in the PR description. Will do it later.