From 1bb6604a28385efaf45915f3fad1295bdc6eae25 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 15:41:34 +0800 Subject: [PATCH 01/17] gc: support to update safepoint for specific service Signed-off-by: Shafreeck Sea --- go.mod | 4 +++- go.sum | 11 ++------- server/core/storage.go | 52 ++++++++++++++++++++++++++++++++++++++++++ server/grpc_service.go | 33 +++++++++++++++++++++++++++ server/server.go | 10 ++++++++ 5 files changed, 100 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 65b8d467d54..0e8798c70af 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/gorilla/mux v1.7.3 github.com/gorilla/websocket v1.2.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 - github.com/grpc-ecosystem/grpc-gateway v1.12.1 + github.com/grpc-ecosystem/grpc-gateway v1.14.3 github.com/json-iterator/go v1.1.9 // indirect github.com/juju/ratelimit v1.0.1 github.com/kevinburke/go-bindata v3.18.0+incompatible @@ -68,3 +68,5 @@ require ( gopkg.in/yaml.v2 v2.2.8 // indirect honnef.co/go/tools v0.0.1-2020.1.3 ) + +replace github.com/pingcap/kvproto => /home/shafreeck/Codes/pingcap/kvproto diff --git a/go.sum b/go.sum index c185e265054..ce19cbd43cf 100644 --- a/go.sum +++ b/go.sum @@ -124,7 +124,6 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+ github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -138,7 +137,6 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg= github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -166,8 +164,9 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/grpc-ecosystem/grpc-gateway v1.12.1 h1:zCy2xE9ablevUOrUZc3Dl72Dt+ya2FNAvC2yLYMHzi4= github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= +github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY= +github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0= github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 h1:7xsUJsB2NrdcttQPa7JLEaGzvdbk7KvfrjgHZXOQRo0= github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69/go.mod h1:YLEMZOtU+AZ7dhN9T/IpGhXVGly2bvkJQ+zxj3WeVQo= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -288,10 +287,6 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= -github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537 h1:MwOTgYJ7EKcXt6Tci+VsDZit0KzFMQ0mNpJmFpcnFrE= -github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= @@ -539,12 +534,10 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c h1:hrpEMCZ2O7DR5gC1n2AJGVhrwiEjOi35+jxtIuZpTMo= google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= -google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= diff --git a/server/core/storage.go b/server/core/storage.go index 503a23d8cf6..f700a346997 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -23,6 +23,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/BurntSushi/toml" "github.com/gogo/protobuf/proto" @@ -385,6 +386,57 @@ func (s *Storage) LoadGCSafePoint() (uint64, error) { return safePoint, nil } +// ServiceSafePoint is the safepoint for a specific service +type ServiceSafePoint struct { + ServiceID string + ExpiredAt int64 + SafePoint uint64 +} + +// SaveServiceGCSafePoint saves a GC safepoint for the service +func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { + key := path.Join(gcPath, "safe_point", "service", ssp.ServiceID) + value, err := json.Marshal(ssp) + if err != nil { + return err + } + + return s.Save(key, string(value)) +} + +// LoadMinServiceGCSafePoint returns the minimum safepoint across all services +func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { + prefix := path.Join(gcPath, "safe_point", "service") + // the next of 'e' is 'f' + prefixEnd := path.Join(gcPath, "safe_point", "servicf") + keys, values, err := s.LoadRange(prefix, prefixEnd, math.MaxInt32) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return &ServiceSafePoint{}, nil + } + + min := &ServiceSafePoint{SafePoint: math.MaxUint64} + ssp := &ServiceSafePoint{} + now := time.Now().Unix() + for i, key := range keys { + if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { + return nil, err + } + if ssp.ExpiredAt < now { + s.Remove(key) + continue + } + if ssp.SafePoint < min.SafePoint { + min = ssp + } + } + + return min, nil + +} + // LoadAllScheduleConfig loads all schedulers' config. func (s *Storage) LoadAllScheduleConfig() ([]string, []string, error) { keys, values, err := s.LoadRange(customScheduleConfigPath, clientv3.GetPrefixRangeEnd(customScheduleConfigPath), 1000) diff --git a/server/grpc_service.go b/server/grpc_service.go index 94d8ac5737d..d70e8a1e1de 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -734,6 +734,39 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa }, nil } +// UpdateServiceGCSafePoint update the safepoint for specific service +func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.UpdateServiceGCSafePointRequest) (*pdpb.UpdateServiceGCSafePointResponse, error) { + if err := s.validateRequest(request.GetHeader()); err != nil { + return nil, err + } + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.UpdateServiceGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil + } + + ssp := &core.ServiceSafePoint{ + ServiceID: string(request.ServiceId), + ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), + SafePoint: request.SafePoint, + } + if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { + return nil, err + } + + ssp, err := s.storage.LoadMinServiceGCSafePoint() + if err != nil { + return nil, err + } + + return &pdpb.UpdateServiceGCSafePointResponse{ + Header: s.header(), + ServiceId: []byte(ssp.ServiceID), + TTL: ssp.ExpiredAt - time.Now().Unix(), + MinSafePoint: ssp.SafePoint, + }, nil +} + // GetOperator gets information about the operator belonging to the speicfy region. func (s *Server) GetOperator(ctx context.Context, request *pdpb.GetOperatorRequest) (*pdpb.GetOperatorResponse, error) { if err := s.validateRequest(request.GetHeader()); err != nil { diff --git a/server/server.go b/server/server.go index 0df06c4645c..618557622f4 100644 --- a/server/server.go +++ b/server/server.go @@ -141,6 +141,9 @@ type Server struct { // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() + + // The minimum safe point of all services + minSafePoint uint64 } // HandlerBuilder builds a server HTTP handler. @@ -370,6 +373,13 @@ func (s *Server) startServer(ctx context.Context) error { } s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster) + // Load the min safe point + minSafePoint, err := s.storage.LoadGCSafePoint() + if err != nil { + return err + } + s.minSafePoint = minSafePoint + // Run callbacks for _, cb := range s.startCallbacks { cb() From dc52f0ab7fab0d76148e6625302622fefbc696cb Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 16:23:11 +0800 Subject: [PATCH 02/17] safepoint: add the client API Signed-off-by: Shafreeck Sea --- client/client.go | 36 ++++++++++++++++++++++++++++++++++++ client/metrics.go | 44 +++++++++++++++++++++++--------------------- 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/client/client.go b/client/client.go index 2fb5c028036..a8be47aed8f 100644 --- a/client/client.go +++ b/client/client.go @@ -66,6 +66,12 @@ type Client interface { // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) + + // UpdateServiceGCSafePoint updates the safepoint for specific service and + // returns the minimum safepoint across all services, this value is used to + // determine the safepoint for multiple services, it does not tigger a GC + // job. Use UpdateGCSafePoint to trigger the GC job if needed. + UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) // ScatterRegion scatters the specified region. Should use it for a batch of regions, // and the distribution of these regions will be dispersed. ScatterRegion(ctx context.Context, regionID uint64) error @@ -604,6 +610,36 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 return resp.GetNewSafePoint(), nil } +// UpdateServiceGCSafePoint updates the safepoint for specific service and +// returns the minimum safepoint across all services, this value is used to +// determine the safepoint for multiple services, it does not tigger a GC +// job. Use UpdateGCSafePoint to trigger the GC job if needed. +func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateServiceGCSafePoint", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + + start := time.Now() + defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() + + ctx, cancel := context.WithTimeout(ctx, pdTimeout) + resp, err := c.leaderClient().UpdateServiceGCSafePoint(ctx, &pdpb.UpdateServiceGCSafePointRequest{ + Header: c.requestHeader(), + ServiceId: []byte(serviceID), + TTL: ttl, + SafePoint: safePoint, + }) + cancel() + + if err != nil { + cmdFailedDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return 0, errors.WithStack(err) + } + return resp.GetMinSafePoint(), nil +} + func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.ScatterRegion", opentracing.ChildOf(span.Context())) diff --git a/client/metrics.go b/client/metrics.go index 45e6b4659cc..5b74cd8f053 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -73,28 +73,30 @@ var ( var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. - cmdDurationWait = cmdDuration.WithLabelValues("wait") - cmdDurationTSO = cmdDuration.WithLabelValues("tso") - cmdDurationTSOAsyncWait = cmdDuration.WithLabelValues("tso_async_wait") - cmdDurationGetRegion = cmdDuration.WithLabelValues("get_region") - cmdDurationGetPrevRegion = cmdDuration.WithLabelValues("get_prev_region") - cmdDurationGetRegionByID = cmdDuration.WithLabelValues("get_region_byid") - cmdDurationScanRegions = cmdDuration.WithLabelValues("scan_regions") - cmdDurationGetStore = cmdDuration.WithLabelValues("get_store") - cmdDurationGetAllStores = cmdDuration.WithLabelValues("get_all_stores") - cmdDurationUpdateGCSafePoint = cmdDuration.WithLabelValues("update_gc_safe_point") - cmdDurationScatterRegion = cmdDuration.WithLabelValues("scatter_region") - cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator") + cmdDurationWait = cmdDuration.WithLabelValues("wait") + cmdDurationTSO = cmdDuration.WithLabelValues("tso") + cmdDurationTSOAsyncWait = cmdDuration.WithLabelValues("tso_async_wait") + cmdDurationGetRegion = cmdDuration.WithLabelValues("get_region") + cmdDurationGetPrevRegion = cmdDuration.WithLabelValues("get_prev_region") + cmdDurationGetRegionByID = cmdDuration.WithLabelValues("get_region_byid") + cmdDurationScanRegions = cmdDuration.WithLabelValues("scan_regions") + cmdDurationGetStore = cmdDuration.WithLabelValues("get_store") + cmdDurationGetAllStores = cmdDuration.WithLabelValues("get_all_stores") + cmdDurationUpdateGCSafePoint = cmdDuration.WithLabelValues("update_gc_safe_point") + cmdDurationUpdateServiceGCSafePoint = cmdDuration.WithLabelValues("update_service_gc_safe_point") + cmdDurationScatterRegion = cmdDuration.WithLabelValues("scatter_region") + cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator") - cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") - cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso") - cmdFailDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region") - cmdFailedDurationGetRegionByID = cmdFailedDuration.WithLabelValues("get_region_byid") - cmdFailedDurationScanRegions = cmdFailedDuration.WithLabelValues("scan_regions") - cmdFailedDurationGetStore = cmdFailedDuration.WithLabelValues("get_store") - cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores") - cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") - requestDurationTSO = requestDuration.WithLabelValues("tso") + cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") + cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso") + cmdFailDurationGetPrevRegion = cmdFailedDuration.WithLabelValues("get_prev_region") + cmdFailedDurationGetRegionByID = cmdFailedDuration.WithLabelValues("get_region_byid") + cmdFailedDurationScanRegions = cmdFailedDuration.WithLabelValues("scan_regions") + cmdFailedDurationGetStore = cmdFailedDuration.WithLabelValues("get_store") + cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores") + cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") + cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point") + requestDurationTSO = requestDuration.WithLabelValues("tso") // config configCmdDurationCreate = configCmdDuration.WithLabelValues("create") From 13088d14aed96d684d55f331cd66b92989811bf7 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 16:33:47 +0800 Subject: [PATCH 03/17] safepoint: it is no limit when the limit = 0 Signed-off-by: Shafreeck Sea --- server/kv/levedb_kv.go | 2 +- server/kv/mem_kv.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/kv/levedb_kv.go b/server/kv/levedb_kv.go index 7b5d72ecc39..f786a125974 100644 --- a/server/kv/levedb_kv.go +++ b/server/kv/levedb_kv.go @@ -51,7 +51,7 @@ func (kv *LeveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, [] values := make([]string, 0, limit) count := 0 for iter.Next() { - if count >= limit { + if limit > 0 && count >= limit { break } keys = append(keys, string(iter.Key())) diff --git a/server/kv/mem_kv.go b/server/kv/mem_kv.go index a32d3bc5b84..461991d33f9 100644 --- a/server/kv/mem_kv.go +++ b/server/kv/mem_kv.go @@ -57,7 +57,10 @@ func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string kv.tree.AscendRange(memoryKVItem{key, ""}, memoryKVItem{endKey, ""}, func(item btree.Item) bool { keys = append(keys, item.(memoryKVItem).key) values = append(values, item.(memoryKVItem).value) - return len(keys) < limit + if limit > 0 { + return len(keys) < limit + } + return true }) return keys, values, nil } From 4bf889043c15b9d7faf8ae58d4ac40d26908e3b4 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 16:38:24 +0800 Subject: [PATCH 04/17] safepoint: fix to return the min value Signed-off-by: Shafreeck Sea --- server/core/storage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/core/storage.go b/server/core/storage.go index f700a346997..b263c0ef56f 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -409,7 +409,7 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { prefix := path.Join(gcPath, "safe_point", "service") // the next of 'e' is 'f' prefixEnd := path.Join(gcPath, "safe_point", "servicf") - keys, values, err := s.LoadRange(prefix, prefixEnd, math.MaxInt32) + keys, values, err := s.LoadRange(prefix, prefixEnd, 0) if err != nil { return nil, err } @@ -418,9 +418,9 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { } min := &ServiceSafePoint{SafePoint: math.MaxUint64} - ssp := &ServiceSafePoint{} now := time.Now().Unix() for i, key := range keys { + ssp := &ServiceSafePoint{} if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } From c68c409c409d7eeec6d463a0b9985e051c56d9d1 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 16:59:03 +0800 Subject: [PATCH 05/17] safepoint: add unit tests Signed-off-by: Shafreeck Sea --- server/core/storage_test.go | 57 +++++++++++++++++++++++++++++++++++++ tests/client/client_test.go | 27 ++++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/server/core/storage_test.go b/server/core/storage_test.go index f44e0003c18..f3556b60899 100644 --- a/server/core/storage_test.go +++ b/server/core/storage_test.go @@ -14,8 +14,12 @@ package core import ( + "encoding/json" "fmt" "math" + "path" + "strings" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -194,6 +198,59 @@ func (s *testKVSuite) TestLoadGCSafePoint(c *C) { } } +func (s *testKVSuite) TestSaveServiceGCSafePoint(c *C) { + mem := kv.NewMemoryKV() + storage := NewStorage(mem) + expireAt := time.Now().Add(100 * time.Second).Unix() + serviceSafePoints := []*ServiceSafePoint{ + {"1", expireAt, 1}, + {"2", expireAt, 2}, + {"3", expireAt, 3}, + } + + for _, ssp := range serviceSafePoints { + c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil) + } + + prefix := path.Join(gcPath, "safe_point", "service") + prefixEnd := path.Join(gcPath, "safe_point", "servicf") + keys, values, err := mem.LoadRange(prefix, prefixEnd, len(serviceSafePoints)) + c.Assert(err, IsNil) + c.Assert(len(keys), Equals, 3) + c.Assert(len(values), Equals, 3) + + ssp := &ServiceSafePoint{} + for i, key := range keys { + c.Assert(strings.HasSuffix(key, serviceSafePoints[i].ServiceID), IsTrue) + + c.Assert(json.Unmarshal([]byte(values[i]), ssp), IsNil) + c.Assert(ssp.ServiceID, Equals, serviceSafePoints[i].ServiceID) + c.Assert(ssp.ExpiredAt, Equals, serviceSafePoints[i].ExpiredAt) + c.Assert(ssp.SafePoint, Equals, serviceSafePoints[i].SafePoint) + } +} + +func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) { + mem := kv.NewMemoryKV() + storage := NewStorage(mem) + expireAt := time.Now().Add(1000 * time.Second).Unix() + serviceSafePoints := []*ServiceSafePoint{ + {"1", 0, 1}, + {"2", expireAt, 2}, + {"3", expireAt, 3}, + } + + for _, ssp := range serviceSafePoints { + c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil) + } + + ssp, err := storage.LoadMinServiceGCSafePoint() + c.Assert(err, IsNil) + c.Assert(ssp.ServiceID, Equals, "2") + c.Assert(ssp.ExpiredAt, Equals, expireAt) + c.Assert(ssp.SafePoint, Equals, uint64(2)) +} + type KVWithMaxRangeLimit struct { kv.Base rangeLimit int diff --git a/tests/client/client_test.go b/tests/client/client_test.go index c74a41f2bb5..c13158ab4d4 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -596,6 +596,33 @@ func (s *testClientSuite) TestUpdateGCSafePoint(c *C) { s.checkGCSafePoint(c, math.MaxUint64) } +func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { + serviceSafePoints := []struct { + ServiceID string + TTL int64 + SafePoint uint64 + }{ + {"a", 1000, 1}, + {"b", 1000, 2}, + {"c", 1000, 3}, + } + for _, ssp := range serviceSafePoints { + min, err := s.client.UpdateServiceGCSafePoint(context.Background(), + ssp.ServiceID, 1000, ssp.SafePoint) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(1)) + } + min, err := s.client.UpdateServiceGCSafePoint(context.Background(), + "a", 1000, 4) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(2)) + + min, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "b", -100, 2) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(3)) +} + func (s *testClientSuite) TestScatterRegion(c *C) { regionID := regionIDAllocator.alloc() region := &metapb.Region{ From fd1155c5f9d5d9f02b251ec3738698a6f982e826 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 16:59:55 +0800 Subject: [PATCH 06/17] server: remove unused code Signed-off-by: Shafreeck Sea --- server/server.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/server/server.go b/server/server.go index 618557622f4..0df06c4645c 100644 --- a/server/server.go +++ b/server/server.go @@ -141,9 +141,6 @@ type Server struct { // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() - - // The minimum safe point of all services - minSafePoint uint64 } // HandlerBuilder builds a server HTTP handler. @@ -373,13 +370,6 @@ func (s *Server) startServer(ctx context.Context) error { } s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster) - // Load the min safe point - minSafePoint, err := s.storage.LoadGCSafePoint() - if err != nil { - return err - } - s.minSafePoint = minSafePoint - // Run callbacks for _, cb := range s.startCallbacks { cb() From a95b521f37820a6eefa1e30b5bd9b6cec14ae13c Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 17:06:14 +0800 Subject: [PATCH 07/17] storage: remove empty line Signed-off-by: Shafreeck Sea --- server/core/storage.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/core/storage.go b/server/core/storage.go index b263c0ef56f..72a11b442b6 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -434,7 +434,6 @@ func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { } return min, nil - } // LoadAllScheduleConfig loads all schedulers' config. From 80b353310305e32bb26138df84463a3cae5a9554 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 17:15:14 +0800 Subject: [PATCH 08/17] build: update go module Signed-off-by: Shafreeck Sea --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 0e8798c70af..074d7a589ea 100644 --- a/go.mod +++ b/go.mod @@ -68,5 +68,3 @@ require ( gopkg.in/yaml.v2 v2.2.8 // indirect honnef.co/go/tools v0.0.1-2020.1.3 ) - -replace github.com/pingcap/kvproto => /home/shafreeck/Codes/pingcap/kvproto From eef5bf0ac239f7709e0b7a2cc76b11a27ee6e4c1 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 17:29:14 +0800 Subject: [PATCH 09/17] build: update kvproto Signed-off-by: Shafreeck Sea --- go.mod | 2 +- go.sum | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 074d7a589ea..21c0fc8b88d 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d - github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537 + github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index ce19cbd43cf..c22b04d20e5 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,7 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+ github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -137,6 +138,7 @@ github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4er github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg= github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -287,6 +289,13 @@ github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 h1:58naV4XMEqm0h github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= +github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= +github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200416032236-29afcedc8234 h1:qOkKmMHs5p6YotBCwNNpPUVsmcr8eNlSrbR3HAdjM5Y= +github.com/pingcap/kvproto v0.0.0-20200416032236-29afcedc8234/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd h1:BuTFEyCEm71vUU/3qmndWNWfySS0Jwek1iZ1rQ/4Jqc= +github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= @@ -534,10 +543,12 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c h1:hrpEMCZ2O7DR5gC1n2AJGVhrwiEjOi35+jxtIuZpTMo= google.golang.org/genproto v0.0.0-20190927181202-20e1ac93f88c/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= From 3362f14c1cd47bebb759b461b3b544b32ed3f4c1 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 17:51:21 +0800 Subject: [PATCH 10/17] safepoint: prevent service safepoint backward Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 17 ++++++++++------- server/server.go | 7 +++++++ tests/client/client_test.go | 6 ++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index d70e8a1e1de..032ab968096 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -745,19 +745,22 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return &pdpb.UpdateServiceGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - ssp := &core.ServiceSafePoint{ - ServiceID: string(request.ServiceId), - ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), - SafePoint: request.SafePoint, - } - if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { - return nil, err + if request.SafePoint > s.minServiceGCSafePoint { + ssp := &core.ServiceSafePoint{ + ServiceID: string(request.ServiceId), + ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), + SafePoint: request.SafePoint, + } + if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { + return nil, err + } } ssp, err := s.storage.LoadMinServiceGCSafePoint() if err != nil { return nil, err } + atomic.StoreUint64(&s.minServiceGCSafePoint, ssp.SafePoint) return &pdpb.UpdateServiceGCSafePointResponse{ Header: s.header(), diff --git a/server/server.go b/server/server.go index 0df06c4645c..e7a7d1d28e9 100644 --- a/server/server.go +++ b/server/server.go @@ -141,6 +141,8 @@ type Server struct { // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() + + minServiceGCSafePoint uint64 } // HandlerBuilder builds a server HTTP handler. @@ -369,6 +371,11 @@ func (s *Server) startServer(ctx context.Context) error { s.cluster.SetConfigCheck() } s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster) + safePoint, err := s.storage.LoadGCSafePoint() + if err != nil { + return err + } + s.minServiceGCSafePoint = safePoint // Run callbacks for _, cb := range s.startCallbacks { diff --git a/tests/client/client_test.go b/tests/client/client_test.go index c13158ab4d4..258a0adb4e8 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -621,6 +621,12 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) { "b", -100, 2) c.Assert(err, IsNil) c.Assert(min, Equals, uint64(3)) + + // prevent backoff + min, err = s.client.UpdateServiceGCSafePoint(context.Background(), + "b", 1000, 2) + c.Assert(err, IsNil) + c.Assert(min, Equals, uint64(3)) } func (s *testClientSuite) TestScatterRegion(c *C) { From a5a9e973329905ff2d1e727db7485f53c9c7169a Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 17 Apr 2020 18:42:48 +0800 Subject: [PATCH 11/17] safepoint: fix to prevent backward Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 24 +++++++++++++++--------- server/server.go | 7 ------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 032ab968096..b9d53031471 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -745,7 +745,12 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return &pdpb.UpdateServiceGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - if request.SafePoint > s.minServiceGCSafePoint { + min, err := s.storage.LoadMinServiceGCSafePoint() + if err != nil { + return nil, err + } + + if request.SafePoint > min.SafePoint { ssp := &core.ServiceSafePoint{ ServiceID: string(request.ServiceId), ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), @@ -755,18 +760,19 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return nil, err } } - - ssp, err := s.storage.LoadMinServiceGCSafePoint() - if err != nil { - return nil, err + // If the min safepoint is updated, load the next one + if string(request.ServiceId) == min.ServiceID { + min, err = s.storage.LoadMinServiceGCSafePoint() + if err != nil { + return nil, err + } } - atomic.StoreUint64(&s.minServiceGCSafePoint, ssp.SafePoint) return &pdpb.UpdateServiceGCSafePointResponse{ Header: s.header(), - ServiceId: []byte(ssp.ServiceID), - TTL: ssp.ExpiredAt - time.Now().Unix(), - MinSafePoint: ssp.SafePoint, + ServiceId: []byte(min.ServiceID), + TTL: min.ExpiredAt - time.Now().Unix(), + MinSafePoint: min.SafePoint, }, nil } diff --git a/server/server.go b/server/server.go index e7a7d1d28e9..0df06c4645c 100644 --- a/server/server.go +++ b/server/server.go @@ -141,8 +141,6 @@ type Server struct { // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() - - minServiceGCSafePoint uint64 } // HandlerBuilder builds a server HTTP handler. @@ -371,11 +369,6 @@ func (s *Server) startServer(ctx context.Context) error { s.cluster.SetConfigCheck() } s.hbStreams = newHeartbeatStreams(ctx, s.clusterID, s.cluster) - safePoint, err := s.storage.LoadGCSafePoint() - if err != nil { - return err - } - s.minServiceGCSafePoint = safePoint // Run callbacks for _, cb := range s.startCallbacks { From cad49662dc56c0872f9a4b1fa5d0c76b1caaeefb Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Mon, 20 Apr 2020 18:52:08 +0800 Subject: [PATCH 12/17] safepoint: remove a safepoint when its ttl is 0 Signed-off-by: Shafreeck Sea --- go.sum | 3 --- server/core/storage.go | 6 ++++++ server/grpc_service.go | 19 ++++++++++++------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/go.sum b/go.sum index c22b04d20e5..8cddb34ea95 100644 --- a/go.sum +++ b/go.sum @@ -291,9 +291,6 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200409042513-05af14db7537/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200416032236-29afcedc8234 h1:qOkKmMHs5p6YotBCwNNpPUVsmcr8eNlSrbR3HAdjM5Y= -github.com/pingcap/kvproto v0.0.0-20200416032236-29afcedc8234/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd h1:BuTFEyCEm71vUU/3qmndWNWfySS0Jwek1iZ1rQ/4Jqc= github.com/pingcap/kvproto v0.0.0-20200417092353-efbe03bcffbd/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= diff --git a/server/core/storage.go b/server/core/storage.go index 72a11b442b6..33dab82fa9c 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -404,6 +404,12 @@ func (s *Storage) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { return s.Save(key, string(value)) } +// RemoveServiceGCSafePoint removes a GC safepoint for the service +func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error { + key := path.Join(gcPath, "safe_point", "service", serviceID) + return s.Remove(key) +} + // LoadMinServiceGCSafePoint returns the minimum safepoint across all services func (s *Storage) LoadMinServiceGCSafePoint() (*ServiceSafePoint, error) { prefix := path.Join(gcPath, "safe_point", "service") diff --git a/server/grpc_service.go b/server/grpc_service.go index b9d53031471..b99d067feae 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -744,13 +744,18 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd if rc == nil { return &pdpb.UpdateServiceGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } + if request.TTL <= 0 { + if err := s.storage.RemoveServiceGCSafePoint(string(request.ServiceId)); err != nil { + return nil, err + } + } min, err := s.storage.LoadMinServiceGCSafePoint() if err != nil { return nil, err } - if request.SafePoint > min.SafePoint { + if request.TTL > 0 && request.SafePoint > min.SafePoint { ssp := &core.ServiceSafePoint{ ServiceID: string(request.ServiceId), ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), @@ -759,12 +764,12 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { return nil, err } - } - // If the min safepoint is updated, load the next one - if string(request.ServiceId) == min.ServiceID { - min, err = s.storage.LoadMinServiceGCSafePoint() - if err != nil { - return nil, err + // If the min safepoint is updated, load the next one + if string(request.ServiceId) == min.ServiceID { + min, err = s.storage.LoadMinServiceGCSafePoint() + if err != nil { + return nil, err + } } } From 16d58d301e0cf7594691a79b06431575fa6dca4b Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Mon, 20 Apr 2020 18:59:55 +0800 Subject: [PATCH 13/17] safepoint: make the method thread safe Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 3 +++ server/server.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index b99d067feae..98697f5cba8 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -736,6 +736,9 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa // UpdateServiceGCSafePoint update the safepoint for specific service func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.UpdateServiceGCSafePointRequest) (*pdpb.UpdateServiceGCSafePointResponse, error) { + s.serviceSafePointLock.Lock() + defer s.serviceSafePointLock.Unlock() + if err := s.validateRequest(request.GetHeader()); err != nil { return nil, err } diff --git a/server/server.go b/server/server.go index 0df06c4645c..1dc52c9dfee 100644 --- a/server/server.go +++ b/server/server.go @@ -141,6 +141,9 @@ type Server struct { // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() + + // serviceSafePointLock is a lock for UpdateServiceGCSafePoint + serviceSafePointLock sync.Mutex } // HandlerBuilder builds a server HTTP handler. From 7c3d4f8e9db8250101d3b815efc1b11d995253fe Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Tue, 21 Apr 2020 10:36:50 +0800 Subject: [PATCH 14/17] safepoint: fix the min value when create the first safepoint Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index 98697f5cba8..b63cd5a5e0c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -774,6 +774,10 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd return nil, err } } + // If ssp is the first safepoint, it is the min value now + if min.SafePoint == 0 { + min = ssp + } } return &pdpb.UpdateServiceGCSafePointResponse{ From 1d77c8143b4bca14b8aab8e1c3a16ac19b6066d0 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Tue, 21 Apr 2020 11:53:30 +0800 Subject: [PATCH 15/17] safepoint: fix the expiration time Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 4dddf074e2e..42f2a537642 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -755,7 +755,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd if request.TTL > 0 && request.SafePoint > min.SafePoint { ssp := &core.ServiceSafePoint{ ServiceID: string(request.ServiceId), - ExpiredAt: time.Now().Unix() + request.TTL*int64(time.Second), + ExpiredAt: time.Now().Unix() + request.TTL, SafePoint: request.SafePoint, } if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { From 3ac7f504767a46707f1fa2c938f6913ce3c95a0c Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Tue, 21 Apr 2020 13:37:21 +0800 Subject: [PATCH 16/17] build: fix go.sum Signed-off-by: Shafreeck Sea --- go.sum | 1 - 1 file changed, 1 deletion(-) diff --git a/go.sum b/go.sum index 65c71c515e7..8cddb34ea95 100644 --- a/go.sum +++ b/go.sum @@ -297,7 +297,6 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/pd v2.1.19+incompatible h1:N/8HOd5yptSJZ1LdBa1bcvYOH9YCetvX4cg5fsGf9+c= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 h1:PI8YpTl45F8ilNkrPtT4IdbcZB1SCEa+gK/U5GJYl3E= github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= From c713470cb25588e96165ff917db48fc15a9e3da4 Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Tue, 21 Apr 2020 17:26:28 +0800 Subject: [PATCH 17/17] safepoint: add log message Signed-off-by: Shafreeck Sea --- server/grpc_service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index 42f2a537642..a8d89914445 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -761,6 +761,10 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil { return nil, err } + log.Info("update service GC safe point", + zap.String("service-id", string(ssp.ServiceID)), + zap.Int64("expire-at", ssp.ExpiredAt), + zap.Uint64("safepoint", ssp.SafePoint)) // If the min safepoint is updated, load the next one if string(request.ServiceId) == min.ServiceID { min, err = s.storage.LoadMinServiceGCSafePoint()