Skip to content

Commit

Permalink
gc: support to update safepoint for specific service
Browse files Browse the repository at this point in the history
Signed-off-by: Shafreeck Sea <[email protected]>
  • Loading branch information
shafreeck committed Apr 17, 2020
1 parent c84d31b commit 1bb6604
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 10 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.12.1
github.com/grpc-ecosystem/grpc-gateway v1.14.3
github.com/json-iterator/go v1.1.9 // indirect
github.com/juju/ratelimit v1.0.1
github.com/kevinburke/go-bindata v3.18.0+incompatible
Expand Down Expand Up @@ -68,3 +68,5 @@ require (
gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2020.1.3
)

replace github.com/pingcap/kvproto => /home/shafreeck/Codes/pingcap/kvproto
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand All @@ -138,7 +137,6 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -166,8 +164,9 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4=
github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY=
github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 h1:7xsUJsB2NrdcttQPa7JLEaGzvdbk7KvfrjgHZXOQRo0=
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69/go.mod h1:YLEMZOtU+AZ7dhN9T/IpGhXVGly2bvkJQ+zxj3WeVQo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
Expand Down Expand Up @@ -288,10 +287,6 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537 h1:MwOTgYJ7EKcXt6Tci+VsDZit0KzFMQ0mNpJmFpcnFrE=
github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down Expand Up @@ -539,12 +534,10 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c h1:hrpEMCZ2O7DR5gC1n2AJGVhrwiEjOi35+jxtIuZpTMo=
google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand Down
52 changes: 52 additions & 0 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/BurntSushi/toml"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -385,6 +386,57 @@ func (s *Storage) LoadGCSafePoint() (uint64, error) {
return safePoint, nil
}

// ServiceSafePoint is the safepoint for a specific service
type ServiceSafePoint struct {
ServiceID string
ExpiredAt int64
SafePoint uint64
}

// SaveServiceGCSafePoint saves a GC safepoint for the service
func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {
key := path.Join(gcPath, "safe_point", "service", ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}

return s.Save(key, string(value))
}

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service")
// the next of 'e' is 'f'
prefixEnd := path.Join(gcPath, "safe_point", "servicf")
keys, values, err := s.LoadRange(prefix, prefixEnd, math.MaxInt32)
if err != nil {
return nil, err
}
if len(keys) == 0 {
return &ServiceSafePoint{}, nil
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
ssp := &ServiceSafePoint{}
now := time.Now().Unix()
for i, key := range keys {
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ExpiredAt < now {
s.Remove(key)
continue
}
if ssp.SafePoint < min.SafePoint {
min = ssp
}
}

return min, nil

}

// LoadAllScheduleConfig loads all schedulers' config.
func (s *Storage) LoadAllScheduleConfig() ([]string, []string, error) {
keys, values, err := s.LoadRange(customScheduleConfigPath, clientv3.GetPrefixRangeEnd(customScheduleConfigPath), 1000)
Expand Down
33 changes: 33 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,39 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa
}, nil
}

// UpdateServiceGCSafePoint update the safepoint for specific service
func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.UpdateServiceGCSafePointRequest) (*pdpb.UpdateServiceGCSafePointResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return nil, err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.UpdateServiceGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil
}

ssp := &core.ServiceSafePoint{
ServiceID: string(request.ServiceId),
ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second),
SafePoint: request.SafePoint,
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
return nil, err
}

ssp, err := s.storage.LoadMinServiceGCSafePoint()
if err != nil {
return nil, err
}

return &pdpb.UpdateServiceGCSafePointResponse{
Header: s.header(),
ServiceId: []byte(ssp.ServiceID),
TTL: ssp.ExpiredAt - time.Now().Unix(),
MinSafePoint: ssp.SafePoint,
}, nil
}

// GetOperator gets information about the operator belonging to the speicfy region.
func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type Server struct {
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()

// The minimum safe point of all services
minSafePoint uint64
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -370,6 +373,13 @@ func (s *Server) startServer(ctx context.Context) error {
}
s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster)

// Load the min safe point
minSafePoint, err := s.storage.LoadGCSafePoint()
if err != nil {
return err
}
s.minSafePoint = minSafePoint

// Run callbacks
for _, cb := range s.startCallbacks {
cb()
Expand Down

0 comments on commit 1bb6604

Please sign in to comment.